From aab24f1cdad27df03ac702b50cc55b988168923e Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Tue, 5 Sep 2023 17:45:06 +0200 Subject: [PATCH 1/4] Store: always sort, just compare labelset in proxy heap Signed-off-by: Michael Hoffmann --- pkg/store/bucket.go | 2 +- pkg/store/flushable.go | 16 +--------- pkg/store/prometheus.go | 2 +- pkg/store/proxy_heap.go | 50 ++----------------------------- pkg/store/proxy_heap_test.go | 58 +++++++++++++++++++----------------- pkg/store/tsdb.go | 2 +- 6 files changed, 37 insertions(+), 93 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 4a8eae4572..5b4ce2bb1d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1334,7 +1334,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv) if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index c41b67d152..60722d4b0a 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -9,7 +9,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" ) // flushableServer is an extension of storepb.Store_SeriesServer with a Flush method. @@ -20,23 +19,10 @@ type flushableServer interface { func newFlushableServer( upstream storepb.Store_SeriesServer, - labelNames stringset.Set, - replicaLabels []string, ) flushableServer { - if labelNames.HasAny(replicaLabels) { - return &resortingServer{Store_SeriesServer: upstream} - } - return &passthroughServer{Store_SeriesServer: upstream} + return &resortingServer{Store_SeriesServer: upstream} } -// passthroughServer is a flushableServer that forwards all data to -// an upstream server without additional processing. -type passthroughServer struct { - storepb.Store_SeriesServer -} - -func (p *passthroughServer) Flush() error { return nil } - // resortingServer is a flushableServer that resorts all series by their labels. // This is required if replica labels are stored internally in a TSDB. // Data is resorted and sent to an upstream server upon calling Flush. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 244ae5592d..4c69df5d4f 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -149,7 +149,7 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv, p.labelNamesSet(), r.WithoutReplicaLabels) + s := newFlushableServer(seriesSrv) extLset := p.externalLabelsFn() match, matchers, err := matchesExternalLabels(r.Matchers, extLset) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 7ea18b134d..757a9fa0d3 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -164,9 +164,7 @@ func (d *dedupResponseHeap) At() *storepb.SeriesResponse { // tournament trees need n-1 auxiliary nodes so there // might not be much of a difference. type ProxyResponseHeap struct { - nodes []ProxyResponseHeapNode - iLblsScratch labels.Labels - jLblsScratch labels.Labels + nodes []ProxyResponseHeapNode } func (h *ProxyResponseHeap) Less(i, j int) bool { @@ -174,26 +172,10 @@ func (h *ProxyResponseHeap) Less(i, j int) bool { jResp := h.nodes[j].rs.At() if iResp.GetSeries() != nil && jResp.GetSeries() != nil { - // Response sets are sorted before adding external labels. - // This comparison excludes those labels to keep the same order. - iStoreLbls := h.nodes[i].rs.StoreLabels() - jStoreLbls := h.nodes[j].rs.StoreLabels() - iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels) jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels) - copyLabels(&h.iLblsScratch, iLbls) - copyLabels(&h.jLblsScratch, jLbls) - - var iExtLbls, jExtLbls labels.Labels - h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls) - h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls) - - c := labels.Compare(h.iLblsScratch, h.jLblsScratch) - if c != 0 { - return c < 0 - } - return labels.Compare(iExtLbls, jExtLbls) < 0 + return labels.Compare(iLbls, jLbls) < 0 } else if iResp.GetSeries() == nil && jResp.GetSeries() != nil { return true } else if iResp.GetSeries() != nil && jResp.GetSeries() == nil { @@ -794,34 +776,6 @@ func rmLabels(l labels.Labels, labelsToRemove map[string]struct{}) labels.Labels return l } -// dropLabels removes labels from the given label set and returns the removed labels. -func dropLabels(l labels.Labels, labelsToDrop map[string]struct{}) (labels.Labels, labels.Labels) { - cutoff := len(l) - for i := 0; i < len(l); i++ { - if i == cutoff { - break - } - if _, ok := labelsToDrop[l[i].Name]; !ok { - continue - } - - lbl := l[i] - l = append(append(l[:i], l[i+1:]...), lbl) - cutoff-- - i-- - } - - return l[:cutoff], l[cutoff:] -} - -func copyLabels(dest *labels.Labels, src labels.Labels) { - if len(*dest) < cap(src) { - *dest = make([]labels.Label, len(src)) - } - *dest = (*dest)[:len(src)] - copy(*dest, src) -} - // sortWithoutLabels removes given labels from series and re-sorts the series responses that the same // series with different labels are coming right after each other. Other types of responses are moved to front. func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]struct{}) { diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_heap_test.go index fdfec178ca..50fe2d46be 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_heap_test.go @@ -82,33 +82,6 @@ func TestProxyResponseHeapSort(t *testing.T) { storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")), }, }, - { - title: "merge duplicated sets that were ordered before adding external labels", - input: []respSet{ - &eagerRespSet{ - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - storeLabels: map[string]struct{}{"c": {}}, - }, - &eagerRespSet{ - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - storeLabels: map[string]struct{}{"c": {}}, - }, - }, - exp: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - }, { title: "merge repeated series in stores with different external labels", input: []respSet{ @@ -190,6 +163,37 @@ func TestProxyResponseHeapSort(t *testing.T) { storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), }, }, + { + title: "test", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}}, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"cluster": {}, "prometheus": {}, "prometheus_replica": {}, "receive": {}, "tenant_id": {}, "thanos_replica": {}, "thanos_ruler_replica": {}}, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + }, } { t.Run(tcase.title, func(t *testing.T) { h := NewProxyResponseHeap(tcase.input...) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 73604b9236..f1bd44040d 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -175,7 +175,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), r.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv) match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil { From ae8b083d1d684373b12b595e8b7a7ed0e7193480 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Sat, 9 Sep 2023 13:44:14 +0200 Subject: [PATCH 2/4] Store: add escape hatch to skip store resorting Signed-off-by: Michael Hoffmann --- pkg/store/acceptance_test.go | 71 +++++++++++++++++++----------------- pkg/store/bucket.go | 54 ++++++++------------------- pkg/store/bucket_test.go | 3 ++ pkg/store/flushable.go | 27 +++++++++++++- pkg/store/prometheus.go | 3 +- pkg/store/proxy_heap.go | 10 +++-- pkg/store/tsdb.go | 2 +- 7 files changed, 90 insertions(+), 80 deletions(-) diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index f6a5ef55ec..2d5a7aebfd 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -238,9 +238,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "n", Value: "1"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -251,7 +251,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -270,9 +270,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "missing", Value: ""}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -295,8 +295,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".+"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -306,9 +306,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".*"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -332,8 +332,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: ""}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -352,8 +352,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -363,9 +363,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "^1$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -376,7 +376,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -387,8 +387,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), }, }, { @@ -422,9 +422,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.*$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -435,8 +435,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.+$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -489,8 +489,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -501,7 +501,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -512,8 +512,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -545,7 +545,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -557,7 +557,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^(b|a).*$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -567,9 +567,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "(1|2)"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), }, }, @@ -580,8 +580,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "a|b"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -591,8 +591,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "(a|b)"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -706,12 +706,15 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset } testutil.Ok(t, err) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) + receivedLabels := make([]labels.Labels, 0) for _, s := range srv.SeriesSet { receivedLabels = append(receivedLabels, s.PromLabels()) } - slices.SortFunc(c.expectedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) - slices.SortFunc(receivedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) + testutil.Equals(t, c.expectedLabels, receivedLabels) }) } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 5b4ce2bb1d..e6ec63d019 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1334,7 +1334,8 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv) + srv := newFlushableServer(seriesSrv, sortingStrategyNone) + if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) @@ -1464,44 +1465,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) } - // If we have inner replica labels we need to resort. - s.mtx.Lock() - needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels) - s.mtx.Unlock() - - var resp respSet - if needsEagerRetrival { - labelsToRemove := make(map[string]struct{}) - for _, replicaLabel := range req.WithoutReplicaLabels { - labelsToRemove[replicaLabel] = struct{}{} - } - resp = newEagerRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - labelsToRemove, - ) - } else { - resp = newLazyRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - ) - } + resp := newEagerRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + nil, + ) mtx.Lock() respSets = append(respSets, resp) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 82d42dd9d7..50ead2c36f 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -3484,5 +3484,8 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { }, }, srv)) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) testutil.Equals(t, 2, len(srv.SeriesSet)) } diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index 60722d4b0a..e6cadfbea9 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -11,18 +11,43 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" ) +type sortingStrategy uint64 + +const ( + sortingStrategyStore sortingStrategy = iota + 1 + sortingStrategyNone +) + // flushableServer is an extension of storepb.Store_SeriesServer with a Flush method. type flushableServer interface { storepb.Store_SeriesServer + Flush() error } func newFlushableServer( upstream storepb.Store_SeriesServer, + sortingsortingStrategy sortingStrategy, ) flushableServer { - return &resortingServer{Store_SeriesServer: upstream} + switch sortingsortingStrategy { + case sortingStrategyStore: + return &resortingServer{Store_SeriesServer: upstream} + case sortingStrategyNone: + return &passthroughServer{Store_SeriesServer: upstream} + default: + // should not happen. + panic("unexpected sorting strategy") + } } +// passthroughServer is a flushableServer that forwards all data to +// an upstream server without additional processing. +type passthroughServer struct { + storepb.Store_SeriesServer +} + +func (p *passthroughServer) Flush() error { return nil } + // resortingServer is a flushableServer that resorts all series by their labels. // This is required if replica labels are stored internally in a TSDB. // Data is resorted and sent to an upstream server upon calling Flush. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 4c69df5d4f..308a0741ce 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -149,7 +149,8 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv) + s := newFlushableServer(seriesSrv, sortingStrategyStore) + extLset := p.externalLabelsFn() match, matchers, err := matchesExternalLabels(r.Matchers, extLset) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 757a9fa0d3..51631b388a 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -756,9 +756,9 @@ func newEagerRespSet( // This should be used only for stores that does not support doing this on server side. // See docs/proposals-accepted/20221129-avoid-global-sort.md for details. - if len(l.removeLabels) > 0 { - sortWithoutLabels(l.bufferedResponses, l.removeLabels) - } + // NOTE. Client is not guaranteed to give a sorted response when extLset is added + // Generally we need to resort here. + sortWithoutLabels(l.bufferedResponses, l.removeLabels) }(ret) @@ -785,7 +785,9 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] continue } - ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove)) + if len(labelsToRemove) > 0 { + ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove)) + } } // With the re-ordered label sets, re-sorting all series aligns the same series diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index f1bd44040d..37badaf1dc 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -175,7 +175,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - srv := newFlushableServer(seriesSrv) + srv := newFlushableServer(seriesSrv, sortingStrategyStore) match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil { From 2a10d709396f197bf55167a94dfba1a0da0757f4 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Sat, 9 Sep 2023 17:52:37 +0200 Subject: [PATCH 3/4] Store: remove stringset This is the wrong approach to detect if we need to resort. It cannot detect if we might end up with an unsorted series set if we add extLabels. Signed-off-by: Michael Hoffmann --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 15 ------ cmd/thanos/rule.go | 15 ------ cmd/thanos/sidecar.go | 47 ++-------------- cmd/thanos/store.go | 17 ------ go.mod | 2 - go.sum | 4 -- pkg/receive/multitsdb.go | 13 ----- pkg/store/acceptance_test.go | 2 - pkg/store/bucket.go | 37 ------------- pkg/store/bucket_e2e_test.go | 24 --------- pkg/store/bucket_test.go | 5 -- pkg/store/prometheus.go | 4 -- pkg/store/prometheus_test.go | 7 --- pkg/store/tsdb.go | 41 -------------- pkg/stringset/set.go | 101 ----------------------------------- 16 files changed, 4 insertions(+), 331 deletions(-) delete mode 100644 pkg/stringset/set.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f9811f23d..da6a08fff3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6692](https://github.com/thanos-io/thanos/pull/6692) Store: Fix matching bug when using empty alternative in regex matcher, for example (a||b). - [#6679](https://github.com/thanos-io/thanos/pull/6697) Store: fix block deduplication +- [#6706](https://github.com/thanos-io/thanos/pull/6706) Store: Series responses should always be sorted ### Added diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 8d37f42c23..5b699d264d 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -366,21 +366,6 @@ func runReceive( grpcserver.WithTLSConfig(tlsCfg), ) - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - dbs.UpdateLabelNames(ctx) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) - g.Add( func() error { level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 2d519185a3..ceadf1159c 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -656,21 +656,6 @@ func runRule( ) storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits) options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger))) - - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - tsdbStore.UpdateLabelNames(ctx) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) } options = append(options, grpcserver.WithServer( diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 968a09ee9c..27cf759b2a 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -48,7 +48,6 @@ import ( "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" ) @@ -113,9 +112,8 @@ func runSidecar( mint: conf.limitMinTime.PrometheusTimestamp(), maxt: math.MaxInt64, - limitMinTime: conf.limitMinTime, - client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), - labelNamesSet: stringset.AllStrings(), + limitMinTime: conf.limitMinTime, + client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), } confContentYaml, err := conf.objStore.Content() @@ -239,19 +237,6 @@ func runSidecar( }, func(error) { cancel() }) - - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - m.UpdateLabelNames(context.Background()) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) } { ctx, cancel := context.WithCancel(context.Background()) @@ -264,7 +249,7 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -434,8 +419,6 @@ type promMetadata struct { limitMinTime thanosmodel.TimeOrDurationValue client *promclient.Client - - labelNamesSet stringset.Set } func (s *promMetadata) UpdateLabels(ctx context.Context) error { @@ -463,30 +446,6 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) { s.maxt = maxt } -func (s *promMetadata) UpdateLabelNames(ctx context.Context) { - mint, _ := s.Timestamps() - labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli()) - if err != nil { - s.mtx.Lock() - defer s.mtx.Unlock() - - s.labelNamesSet = stringset.AllStrings() - return - } - - filter := stringset.NewFromStrings(labelNames...) - s.mtx.Lock() - s.labelNamesSet = filter - s.mtx.Unlock() -} - -func (s *promMetadata) LabelNamesSet() stringset.Set { - s.mtx.Lock() - defer s.mtx.Unlock() - - return s.labelNamesSet -} - func (s *promMetadata) Labels() labels.Labels { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 29ac6921a2..73e4b838dd 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -501,23 +501,6 @@ func runStore( }) } - { - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - bs.UpdateLabelNames() - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) - - } // Add bucket UI for loaded blocks. { ins := extpromhttp.NewInstrumentationMiddleware(reg, nil) diff --git a/go.mod b/go.mod index b9b2c27d22..404442782e 100644 --- a/go.mod +++ b/go.mod @@ -118,14 +118,12 @@ require ( require ( github.com/onsi/gomega v1.27.10 - github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb go.opentelemetry.io/contrib/propagators/autoprop v0.38.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 ) require ( - github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/google/s2a-go v0.1.4 // indirect github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect diff --git a/go.sum b/go.sum index b18e2af8d2..2f61a3076b 100644 --- a/go.sum +++ b/go.sum @@ -223,8 +223,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= -github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/digitalocean/godo v1.99.0 h1:gUHO7n9bDaZFWvbzOum4bXE0/09ZuYA9yA8idQHX57E= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= @@ -851,8 +849,6 @@ github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHi github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20 h1:a9hSJdJcd16e0HoMsnFvaHvxB3pxSD+SC7+CISp7xY0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U= -github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ= github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw= diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 4dadb97343..509e9f3535 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -867,19 +867,6 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab return initialLset, nil } -func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) { - t.mtx.RLock() - defer t.mtx.RUnlock() - - for _, tenant := range t.tenants { - db := tenant.storeTSDB - if db == nil { - continue - } - db.UpdateLabelNames(ctx) - } -} - // extendLabels extends external labels of the initial label set. // If an external label shares same name with a label in the initial label set, // use the label in the initial label set and inform user about it. diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index 2d5a7aebfd..c22c27bf3c 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -28,7 +28,6 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -827,7 +826,6 @@ func TestPrometheusStore_Acceptance(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return extLset }, func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) }, - func() stringset.Set { return stringset.AllStrings() }, func() string { return version }) testutil.Ok(tt, err) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e6ec63d019..54150d178a 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -59,7 +59,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/strutil" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" @@ -387,9 +386,6 @@ type BucketStore struct { enabledLazyExpandedPostings bool - bmtx sync.Mutex - labelNamesSet stringset.Set - blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator } @@ -543,7 +539,6 @@ func NewBucketStore( enableSeriesResponseHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, seriesBatchSize: SeriesBatchSize, - labelNamesSet: stringset.AllStrings(), } for _, option := range options { @@ -1790,38 +1785,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq }, nil } -func (s *BucketStore) UpdateLabelNames() { - s.mtx.RLock() - defer s.mtx.RUnlock() - - newSet := stringset.New() - for _, b := range s.blocks { - labelNames, err := b.indexHeaderReader.LabelNames() - if err != nil { - level.Warn(s.logger).Log("msg", "error getting label names", "block", b.meta.ULID, "err", err.Error()) - s.updateLabelNamesSet(stringset.AllStrings()) - return - } - for _, l := range labelNames { - newSet.Insert(l) - } - } - s.updateLabelNamesSet(newSet) -} - -func (s *BucketStore) updateLabelNamesSet(newSet stringset.Set) { - s.bmtx.Lock() - s.labelNamesSet = newSet - s.bmtx.Unlock() -} - -func (b *BucketStore) LabelNamesSet() stringset.Set { - b.bmtx.Lock() - defer b.bmtx.Unlock() - - return b.labelNamesSet -} - func (b *bucketBlock) FilterExtLabelsMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, bool) { // We filter external labels from matchers so we won't try to match series on them. var result []*labels.Matcher diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 7262dee155..ebd1ffa709 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -779,30 +779,6 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { }) } -func TestBucketStore_LabelNamesSet_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { - dir := t.TempDir() - - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) - s.cache.SwapWith(noopCache{}) - - mint, maxt := s.store.TimeRange() - testutil.Equals(t, s.minTime, mint) - testutil.Equals(t, s.maxTime, maxt) - - s.store.UpdateLabelNames() - for _, b := range s.store.blocks { - waitTimeout(t, &b.pendingReaders, 5*time.Second) - } - - filter := s.store.LabelNamesSet() - for _, n := range []string{"a", "b", "c"} { - testutil.Assert(t, filter.Has(n), "expected filter to have %s", n) - } - testutil.Equals(t, 3, filter.Count()) - }) -} - func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) { cases := map[string]struct { maxSeriesLimit uint64 diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 50ead2c36f..846fea2e98 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -61,7 +61,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -1661,7 +1660,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { chunksLimiterFactory: NewChunksLimiterFactory(0), seriesLimiterFactory: NewSeriesLimiterFactory(0), bytesLimiterFactory: NewBytesLimiterFactory(0), - labelNamesSet: stringset.AllStrings(), } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { @@ -3471,9 +3469,6 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) - // make sure to have updated inner label names - bucketStore.UpdateLabelNames() - srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, bucketStore.Series(&storepb.SeriesRequest{ WithoutReplicaLabels: []string{"replica"}, diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 308a0741ce..fd6d4c0195 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -42,7 +42,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -54,7 +53,6 @@ type PrometheusStore struct { buffers sync.Pool component component.StoreAPI externalLabelsFn func() labels.Labels - labelNamesSet func() stringset.Set promVersion func() string timestamps func() (mint int64, maxt int64) @@ -81,7 +79,6 @@ func NewPrometheusStore( component component.StoreAPI, externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), - labelNamesSet func() stringset.Set, promVersion func() string, ) (*PrometheusStore, error) { if logger == nil { @@ -95,7 +92,6 @@ func NewPrometheusStore( externalLabelsFn: externalLabelsFn, promVersion: promVersion, timestamps: timestamps, - labelNamesSet: labelNamesSet, remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 82965672c7..d0597b6e9c 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -26,7 +26,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -72,7 +71,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return limitMinT, -1 }, - func() stringset.Set { return stringset.AllStrings() }, nil, ) // MaxTime does not matter. testutil.Ok(t, err) @@ -234,7 +232,6 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, - func() stringset.Set { return stringset.AllStrings() }, nil, ) testutil.Ok(t, err) @@ -417,7 +414,6 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -481,7 +477,6 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -512,7 +507,6 @@ func TestPrometheusStore_Info(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 123, 456 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) @@ -592,7 +586,6 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 37badaf1dc..b5182f3008 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -13,7 +13,6 @@ import ( "sync" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -26,7 +25,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" ) const RemoteReadFrameLimit = 1048576 @@ -46,9 +44,6 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - lmx sync.RWMutex - labelNamesSet stringset.Set - extLset labels.Labels mtx sync.RWMutex } @@ -77,7 +72,6 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI component: component, extLset: extLset, maxBytesPerFrame: RemoteReadFrameLimit, - labelNamesSet: stringset.AllStrings(), buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b @@ -376,38 +370,3 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return &storepb.LabelValuesResponse{Values: values}, nil } - -func (s *TSDBStore) UpdateLabelNames(ctx context.Context) { - newSet := stringset.New() - q, err := s.db.ChunkQuerier(ctx, math.MinInt64, math.MaxInt64) - if err != nil { - level.Warn(s.logger).Log("msg", "error creating tsdb querier", "err", err.Error()) - s.setLabelNamesSet(stringset.AllStrings()) - return - } - defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names") - - res, _, err := q.LabelNames() - if err != nil { - level.Warn(s.logger).Log("msg", "error getting label names", "err", err.Error()) - s.setLabelNamesSet(stringset.AllStrings()) - return - } - for _, l := range res { - newSet.Insert(l) - } - s.setLabelNamesSet(newSet) -} - -func (s *TSDBStore) setLabelNamesSet(newSet stringset.Set) { - s.lmx.Lock() - s.labelNamesSet = newSet - s.lmx.Unlock() -} - -func (b *TSDBStore) LabelNamesSet() stringset.Set { - b.lmx.RLock() - defer b.lmx.RUnlock() - - return b.labelNamesSet -} diff --git a/pkg/stringset/set.go b/pkg/stringset/set.go deleted file mode 100644 index 080071570f..0000000000 --- a/pkg/stringset/set.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package stringset - -import ( - cuckoo "github.com/seiflotfy/cuckoofilter" -) - -type Set interface { - Has(string) bool - HasAny([]string) bool - // Count returns the number of elements in the set. - // A value of -1 indicates infinite size and can be returned by a - // set representing all possible string values. - Count() int -} - -type fixedSet struct { - cuckoo *cuckoo.Filter -} - -func (f fixedSet) HasAny(strings []string) bool { - for _, s := range strings { - if f.Has(s) { - return true - } - } - return false -} - -func NewFromStrings(items ...string) Set { - f := cuckoo.NewFilter(uint(len(items))) - for _, label := range items { - f.InsertUnique([]byte(label)) - } - - return &fixedSet{cuckoo: f} -} - -func (f fixedSet) Has(s string) bool { - return f.cuckoo.Lookup([]byte(s)) -} - -func (f fixedSet) Count() int { - return int(f.cuckoo.Count()) -} - -type mutableSet struct { - cuckoo *cuckoo.ScalableCuckooFilter -} - -type MutableSet interface { - Set - Insert(string) -} - -func New() MutableSet { - return &mutableSet{ - cuckoo: cuckoo.NewScalableCuckooFilter(), - } -} - -func (e mutableSet) Insert(s string) { - e.cuckoo.InsertUnique([]byte(s)) -} - -func (e mutableSet) Has(s string) bool { - return e.cuckoo.Lookup([]byte(s)) -} - -func (e mutableSet) HasAny(strings []string) bool { - for _, s := range strings { - if e.Has(s) { - return true - } - } - return false -} - -func (e mutableSet) Count() int { - return int(e.cuckoo.Count()) -} - -type allStringsSet struct{} - -func (e allStringsSet) HasAny(_ []string) bool { - return true -} - -func AllStrings() *allStringsSet { - return &allStringsSet{} -} - -func (e allStringsSet) Has(_ string) bool { - return true -} - -func (e allStringsSet) Count() int { - return -1 -} From b928701ea9a9f0e901c576452ada9e988ae6eeba Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Thu, 14 Sep 2023 15:56:03 +0200 Subject: [PATCH 4/4] Docs: drop paragraph about deduplication on inner labels Signed-off-by: Michael Hoffmann --- docs/components/query.md | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/docs/components/query.md b/docs/components/query.md index 87733ff02d..237eb83794 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -103,16 +103,6 @@ thanos query \ This logic can also be controlled via parameter on QueryAPI. More details below. -### Deduplication on non-external labels - -In `v0.31.0` we have implemented an [optimization](../proposals-accepted/20221129-avoid-global-sort.md) which broke deduplication on non-external labels. We think that it was just a coincidence that deduplication worked at all on non-external labels in previous versions. - -External labels always override any labels a series might have and this makes it so that it is possible to remove replica labels on series returned by a StoreAPI as an optimization. If deduplication happens on internal labels then that might lead to unsorted series from a StoreAPI and that breaks deduplication. - -To fix this use-case, in 0.32.0 we've implemented a cuckoo filter on label names that is updated every 10 seconds. Using it we can detect whether deduplication was requested on internal labels. If that is the case then the series set is resorted before being sent off to the querier. It is strongly recommended to set replica labels which are external labels because otherwise the optimization cannot be applied and your queries will be slower by 20-30%. - -In the future we have plans to expose this cuckoo filter through the InfoAPI. This will allow better scoping queries to StoreAPIs. - ## Experimental PromQL Engine By default, Thanos querier comes with standard Prometheus PromQL engine. However, when `--query.promql-engine=thanos` is specified, Thanos will use [experimental Thanos PromQL engine](http://github.com/thanos-community/promql-engine) which is a drop-in, efficient implementation of PromQL engine with query planner and optimizers.