diff --git a/pkg/losertree/tree.go b/pkg/losertree/tree.go new file mode 100644 index 0000000000..d0194d35ec --- /dev/null +++ b/pkg/losertree/tree.go @@ -0,0 +1,161 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Original version copyright Bryan Boreham, 2024. +// https://github.com/bboreham/go-loser/tree/any. +// Loser tree, from https://en.wikipedia.org/wiki/K-way_merge_algorithm#Tournament_Tree + +package losertree + +type Sequence interface { + Next() bool // Advances and returns true if there is a value at this new position. +} + +func New[E any, S Sequence](sequences []S, maxVal E, at func(S) E, less func(E, E) bool, close func(S)) *Tree[E, S] { + nSequences := len(sequences) + t := Tree[E, S]{ + maxVal: maxVal, + at: at, + less: less, + close: close, + nodes: make([]node[E, S], nSequences*2), + } + for i, s := range sequences { + t.nodes[i+nSequences].items = s + t.moveNext(i + nSequences) // Must call Next on each item so that At() has a value. + } + if nSequences > 0 { + t.nodes[0].index = -1 // flag to be initialized on first call to Next(). + } + return &t +} + +// Call the close function on all sequences that are still open. +func (t *Tree[E, S]) Close() { + for _, e := range t.nodes[len(t.nodes)/2 : len(t.nodes)] { + if e.index == -1 { + continue + } + t.close(e.items) + } +} + +// A loser tree is a binary tree laid out such that nodes N and N+1 have parent N/2. +// We store M leaf nodes in positions M...2M-1, and M-1 internal nodes in positions 1..M-1. +// Node 0 is a special node, containing the winner of the contest. +type Tree[E any, S Sequence] struct { + maxVal E + at func(S) E + less func(E, E) bool + close func(S) // Called when Next() returns false. + nodes []node[E, S] +} + +type node[E any, S Sequence] struct { + index int // This is the loser for all nodes except the 0th, where it is the winner. + value E // Value copied from the loser node, or winner for node 0. + items S // Only populated for leaf nodes. +} + +func (t *Tree[E, S]) moveNext(index int) bool { + n := &t.nodes[index] + if n.items.Next() { + n.value = t.at(n.items) + return true + } + t.close(n.items) // Next() returned false; close it and mark as finished. + n.value = t.maxVal + n.index = -1 + return false +} + +func (t *Tree[E, S]) Winner() S { + return t.nodes[t.nodes[0].index].items +} + +func (t *Tree[E, S]) At() E { + return t.nodes[0].value +} + +func (t *Tree[E, S]) Next() bool { + nodes := t.nodes + if len(nodes) == 0 { + return false + } + if nodes[0].index == -1 { // If tree has not been initialized yet, do that. + t.initialize() + return nodes[nodes[0].index].index != -1 + } + if nodes[nodes[0].index].index == -1 { // already exhausted. + return false + } + t.moveNext(nodes[0].index) + t.replayGames(nodes[0].index) + return nodes[nodes[0].index].index != -1 +} + +// Current winner has been advanced independently; fix up the loser tree. +func (t *Tree[E, S]) Fix(closed bool) { + nodes := t.nodes + cur := &nodes[nodes[0].index] + if closed { + cur.value = t.maxVal + cur.index = -1 + } else { + cur.value = t.at(cur.items) + } + t.replayGames(nodes[0].index) +} + +func (t *Tree[E, S]) IsEmpty() bool { + nodes := t.nodes + if nodes[0].index == -1 { // If tree has not been initialized yet, do that. + t.initialize() + } + return nodes[nodes[0].index].index == -1 +} + +func (t *Tree[E, S]) initialize() { + winner := t.playGame(1) + t.nodes[0].index = winner + t.nodes[0].value = t.nodes[winner].value +} + +// Find the winner at position pos; if it is a non-leaf node, store the loser. +// pos must be >= 1 and < len(t.nodes). +func (t *Tree[E, S]) playGame(pos int) int { + nodes := t.nodes + if pos >= len(nodes)/2 { + return pos + } + left := t.playGame(pos * 2) + right := t.playGame(pos*2 + 1) + var loser, winner int + if t.less(nodes[left].value, nodes[right].value) { + loser, winner = right, left + } else { + loser, winner = left, right + } + nodes[pos].index = loser + nodes[pos].value = nodes[loser].value + return winner +} + +// Starting at pos, which is a winner, re-consider all values up to the root. +func (t *Tree[E, S]) replayGames(pos int) { + nodes := t.nodes + winningValue := nodes[pos].value + for n := parent(pos); n != 0; n = parent(n) { + node := &nodes[n] + if t.less(node.value, winningValue) { + // Record pos as the loser here, and the old loser is the new winner. + node.index, pos = pos, node.index + node.value, winningValue = winningValue, node.value + } + } + // pos is now the winner; store it in node 0. + nodes[0].index = pos + nodes[0].value = winningValue +} + +func parent(i int) int { return i >> 1 } diff --git a/pkg/losertree/tree_test.go b/pkg/losertree/tree_test.go new file mode 100644 index 0000000000..4144a81be4 --- /dev/null +++ b/pkg/losertree/tree_test.go @@ -0,0 +1,124 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Original version copyright Bryan Boreham, 2024. +// https://github.com/bboreham/go-loser/tree/any. +package losertree + +import ( + "math" + "testing" +) + +type List struct { + list []uint64 + cur uint64 +} + +func NewList(list ...uint64) *List { + return &List{list: list} +} + +func (it *List) At() uint64 { + return it.cur +} + +func (it *List) Next() bool { + if len(it.list) > 0 { + it.cur = it.list[0] + it.list = it.list[1:] + return true + } + it.cur = 0 + return false +} + +func (it *List) Seek(val uint64) bool { + for it.cur < val && len(it.list) > 0 { + it.cur = it.list[0] + it.list = it.list[1:] + } + return len(it.list) > 0 +} + +func checkIterablesEqual[E any, S1 Sequence, S2 Sequence](t *testing.T, a S1, b S2, at1 func(S1) E, at2 func(S2) E, less func(E, E) bool) { + t.Helper() + count := 0 + for a.Next() { + count++ + if !b.Next() { + t.Fatalf("b ended before a after %d elements", count) + } + if less(at1(a), at2(b)) || less(at2(b), at1(a)) { + t.Fatalf("position %d: %v != %v", count, at1(a), at2(b)) + } + } + if b.Next() { + t.Fatalf("a ended before b after %d elements", count) + } +} + +var testCases = []struct { + name string + args []*List + want *List +}{ + { + name: "empty input", + want: NewList(), + }, + { + name: "one list", + args: []*List{NewList(1, 2, 3, 4)}, + want: NewList(1, 2, 3, 4), + }, + { + name: "two lists", + args: []*List{NewList(3, 4, 5), NewList(1, 2)}, + want: NewList(1, 2, 3, 4, 5), + }, + { + name: "two lists, first empty", + args: []*List{NewList(), NewList(1, 2)}, + want: NewList(1, 2), + }, + { + name: "two lists, second empty", + args: []*List{NewList(1, 2), NewList()}, + want: NewList(1, 2), + }, + { + name: "two lists b", + args: []*List{NewList(1, 2), NewList(3, 4, 5)}, + want: NewList(1, 2, 3, 4, 5), + }, + { + name: "two lists c", + args: []*List{NewList(1, 3), NewList(2, 4, 5)}, + want: NewList(1, 2, 3, 4, 5), + }, + { + name: "three lists", + args: []*List{NewList(1, 3), NewList(2, 4), NewList(5)}, + want: NewList(1, 2, 3, 4, 5), + }, +} + +func TestMerge(t *testing.T) { + at := func(s *List) uint64 { return s.At() } + less := func(a, b uint64) bool { return a < b } + at2 := func(s *Tree[uint64, *List]) uint64 { return s.Winner().At() } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + numCloses := 0 + closeFn := func(_ *List) { + numCloses++ + } + lt := New(tt.args, math.MaxUint64, at, less, closeFn) + checkIterablesEqual(t, tt.want, lt, at, at2, less) + if numCloses != len(tt.args) { + t.Errorf("Expected %d closes, got %d", len(tt.args), numCloses) + } + }) + } +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 1951c217bf..be0c678727 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1640,13 +1640,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { - defer func() { - for _, resp := range respSets { - resp.Close() - } - }() begin := time.Now() - set := NewDedupResponseHeap(NewProxyResponseHeap(respSets...)) + set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...)) for set.Next() { at := set.At() warn := at.GetWarning() diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 666f130ece..38ec4ba780 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -380,7 +380,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";")) - respHeap := NewDedupResponseHeap(NewProxyResponseHeap(storeResponses...)) + respHeap := NewResponseDeduplicator(NewProxyResponseLoserTree(storeResponses...)) for respHeap.Next() { resp := respHeap.At() diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_merge.go similarity index 86% rename from pkg/store/proxy_heap.go rename to pkg/store/proxy_merge.go index e77628c7c2..0235958b4f 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_merge.go @@ -4,7 +4,6 @@ package store import ( - "container/heap" "context" "fmt" "io" @@ -21,13 +20,14 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/losertree" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) -type dedupResponseHeap struct { - h *ProxyResponseHeap +type responseDeduplicator struct { + h *losertree.Tree[*storepb.SeriesResponse, respSet] bufferedSameSeries []*storepb.SeriesResponse @@ -38,22 +38,22 @@ type dedupResponseHeap struct { ok bool } -// NewDedupResponseHeap returns a wrapper around ProxyResponseHeap that merged duplicated series messages into one. +// NewResponseDeduplicator returns a wrapper around a loser tree that merges duplicated series messages into one. // It also deduplicates identical chunks identified by the same checksum from each series message. -func NewDedupResponseHeap(h *ProxyResponseHeap) *dedupResponseHeap { +func NewResponseDeduplicator(h *losertree.Tree[*storepb.SeriesResponse, respSet]) *responseDeduplicator { ok := h.Next() var prev *storepb.SeriesResponse if ok { prev = h.At() } - return &dedupResponseHeap{ + return &responseDeduplicator{ h: h, ok: ok, prev: prev, } } -func (d *dedupResponseHeap) Next() bool { +func (d *responseDeduplicator) Next() bool { if d.buffRespI+1 < len(d.bufferedResp) { d.buffRespI++ return true @@ -153,105 +153,43 @@ func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb }) } -func (d *dedupResponseHeap) At() *storepb.SeriesResponse { +func (d *responseDeduplicator) At() *storepb.SeriesResponse { return d.bufferedResp[d.buffRespI] } -// ProxyResponseHeap is a heap for storepb.SeriesSets. -// It performs k-way merge between all of those sets. -// TODO(GiedriusS): can be improved with a tournament tree. -// This is O(n*logk) but can be Theta(n*logk). However, -// tournament trees need n-1 auxiliary nodes so there -// might not be much of a difference. -type ProxyResponseHeap struct { - nodes []ProxyResponseHeapNode -} - -func (h *ProxyResponseHeap) Less(i, j int) bool { - iResp := h.nodes[i].rs.At() - jResp := h.nodes[j].rs.At() - - if iResp.GetSeries() != nil && jResp.GetSeries() != nil { - iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels) - jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels) - - return labels.Compare(iLbls, jLbls) < 0 - } else if iResp.GetSeries() == nil && jResp.GetSeries() != nil { - return true - } else if iResp.GetSeries() != nil && jResp.GetSeries() == nil { - return false - } - - // If it is not a series then the order does not matter. What matters - // is that we get different types of responses one after another. - return false -} - -func (h *ProxyResponseHeap) Len() int { - return len(h.nodes) -} - -func (h *ProxyResponseHeap) Swap(i, j int) { - h.nodes[i], h.nodes[j] = h.nodes[j], h.nodes[i] -} - -func (h *ProxyResponseHeap) Push(x interface{}) { - h.nodes = append(h.nodes, x.(ProxyResponseHeapNode)) -} - -func (h *ProxyResponseHeap) Pop() (v interface{}) { - h.nodes, v = h.nodes[:h.Len()-1], h.nodes[h.Len()-1] - return -} - -func (h *ProxyResponseHeap) Empty() bool { - return h.Len() == 0 -} - -func (h *ProxyResponseHeap) Min() *ProxyResponseHeapNode { - return &h.nodes[0] -} - -type ProxyResponseHeapNode struct { - rs respSet -} - -// NewProxyResponseHeap returns heap that k-way merge series together. +// NewProxyResponseLoserTree returns heap that k-way merge series together. // It's agnostic to duplicates and overlaps, it forwards all duplicated series in random order. -func NewProxyResponseHeap(seriesSets ...respSet) *ProxyResponseHeap { - ret := ProxyResponseHeap{ - nodes: make([]ProxyResponseHeapNode, 0, len(seriesSets)), - } +func NewProxyResponseLoserTree(seriesSets ...respSet) *losertree.Tree[*storepb.SeriesResponse, respSet] { + var maxVal *storepb.SeriesResponse = storepb.NewSeriesResponse(nil) - for _, ss := range seriesSets { - if ss.Empty() { - continue + less := func(a, b *storepb.SeriesResponse) bool { + if a == maxVal && b != maxVal { + return false } - ss := ss - ret.Push(ProxyResponseHeapNode{rs: ss}) - } - - heap.Init(&ret) - - return &ret -} - -func (h *ProxyResponseHeap) Next() bool { - return !h.Empty() -} - -func (h *ProxyResponseHeap) At() *storepb.SeriesResponse { - min := h.Min().rs - - atResp := min.At() + if a != maxVal && b == maxVal { + return true + } + if a == maxVal && b == maxVal { + return true + } + if a.GetSeries() != nil && b.GetSeries() != nil { + iLbls := labelpb.ZLabelsToPromLabels(a.GetSeries().Labels) + jLbls := labelpb.ZLabelsToPromLabels(b.GetSeries().Labels) - if min.Next() { - heap.Fix(h, 0) - } else { - heap.Remove(h, 0) + return labels.Compare(iLbls, jLbls) < 0 + } else if a.GetSeries() == nil && b.GetSeries() != nil { + return true + } else if a.GetSeries() != nil && b.GetSeries() == nil { + return false + } + return false } - return atResp + return losertree.New[*storepb.SeriesResponse, respSet](seriesSets, maxVal, func(s respSet) *storepb.SeriesResponse { + return s.At() + }, less, func(s respSet) { + s.Close() + }) } func (l *lazyRespSet) StoreID() string { @@ -320,6 +258,8 @@ func (l *lazyRespSet) Next() bool { l.bufferedResponsesMtx.Lock() defer l.bufferedResponsesMtx.Unlock() + l.initialized = true + if l.noMoreData && len(l.bufferedResponses) == 0 { l.lastResp = nil @@ -335,7 +275,9 @@ func (l *lazyRespSet) Next() bool { if len(l.bufferedResponses) > 0 { l.lastResp = l.bufferedResponses[0] - l.bufferedResponses = l.bufferedResponses[1:] + if l.initialized { + l.bufferedResponses = l.bufferedResponses[1:] + } return true } @@ -344,14 +286,10 @@ func (l *lazyRespSet) Next() bool { } func (l *lazyRespSet) At() *storepb.SeriesResponse { - // We need to wait for at least one response so that we would be able to properly build the heap. if !l.initialized { - l.Next() - l.initialized = true - return l.lastResp + panic("please call Next before At") } - // Next() was called previously. return l.lastResp } @@ -803,7 +741,9 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] } func (l *eagerRespSet) Close() { - l.closeSeries() + if l.closeSeries != nil { + l.closeSeries() + } l.shardMatcher.Close() } @@ -814,7 +754,7 @@ func (l *eagerRespSet) At() *storepb.SeriesResponse { return nil } - return l.bufferedResponses[l.i] + return l.bufferedResponses[l.i-1] } func (l *eagerRespSet) Next() bool { @@ -822,7 +762,7 @@ func (l *eagerRespSet) Next() bool { l.i++ - return l.i < len(l.bufferedResponses) + return l.i <= len(l.bufferedResponses) } func (l *eagerRespSet) Empty() bool { diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_merge_test.go similarity index 88% rename from pkg/store/proxy_heap_test.go rename to pkg/store/proxy_merge_test.go index 0400bd8915..f72ba4a4ef 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_merge_test.go @@ -4,6 +4,7 @@ package store import ( + "fmt" "sync" "testing" @@ -23,7 +24,7 @@ func TestRmLabelsCornerCases(t *testing.T) { }), labels.Labels{}) } -func TestProxyResponseHeapSort(t *testing.T) { +func TestProxyResponseTreeSort(t *testing.T) { for _, tcase := range []struct { title string input []respSet @@ -33,14 +34,16 @@ func TestProxyResponseHeapSort(t *testing.T) { title: "merge sets with different series and common labels", input: []respSet{ &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")), }, }, &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")), storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")), @@ -58,7 +61,8 @@ func TestProxyResponseHeapSort(t *testing.T) { title: "merge sets with different series and labels", input: []respSet{ &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")), @@ -66,7 +70,8 @@ func TestProxyResponseHeapSort(t *testing.T) { }, }, &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")), storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")), @@ -85,7 +90,8 @@ func TestProxyResponseHeapSort(t *testing.T) { title: "merge repeated series in stores with different external labels", input: []respSet{ &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), @@ -93,7 +99,8 @@ func TestProxyResponseHeapSort(t *testing.T) { storeLabels: map[string]struct{}{"ext2": {}}, }, &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -112,7 +119,8 @@ func TestProxyResponseHeapSort(t *testing.T) { title: "merge series with external labels at beginning of series", input: []respSet{ &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")), @@ -120,7 +128,8 @@ func TestProxyResponseHeapSort(t *testing.T) { storeLabels: map[string]struct{}{"a": {}}, }, &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")), storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), @@ -139,7 +148,8 @@ func TestProxyResponseHeapSort(t *testing.T) { title: "merge series in stores with external labels not present in series (e.g. stripped during dedup)", input: []respSet{ &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), @@ -147,7 +157,8 @@ func TestProxyResponseHeapSort(t *testing.T) { storeLabels: map[string]struct{}{"ext2": {}, "replica": {}}, }, &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -166,7 +177,8 @@ func TestProxyResponseHeapSort(t *testing.T) { title: "test", input: []respSet{ &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), @@ -175,7 +187,8 @@ func TestProxyResponseHeapSort(t *testing.T) { storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}}, }, &eagerRespSet{ - wg: &sync.WaitGroup{}, + closeSeries: func() {}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), @@ -195,14 +208,13 @@ func TestProxyResponseHeapSort(t *testing.T) { }, } { t.Run(tcase.title, func(t *testing.T) { - h := NewProxyResponseHeap(tcase.input...) - if !h.Empty() { - got := []*storepb.SeriesResponse{h.At()} - for h.Next() { - got = append(got, h.At()) - } - testutil.Equals(t, tcase.exp, got) + h := NewProxyResponseLoserTree(tcase.input...) + got := []*storepb.SeriesResponse{} + for h.Next() { + r := h.At() + got = append(got, r) } + testutil.Equals(t, tcase.exp, got) }) } } @@ -322,3 +334,29 @@ func BenchmarkSortWithoutLabels(b *testing.B) { sortWithoutLabels(resps, labelsToRemove) } } + +func BenchmarkKWayMerge(b *testing.B) { + for i := 0; i < b.N; i++ { + respSets := []respSet{} + for j := 0; j < 1000; j++ { + respSets = append(respSets, &eagerRespSet{ + closeSeries: func() {}, + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3")), + storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3", "d", "4")), + storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "4")), + }, + }) + } + lt := NewProxyResponseLoserTree(respSets...) + + got := []*storepb.SeriesResponse{} + for lt.Next() { + r := lt.At() + got = append(got, r) + } + + var _ = got + } +} diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 966f3bb39f..c4571de976 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -2194,13 +2194,13 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { for _, tcase := range []struct { responses []*storepb.SeriesResponse - testFn func(responses []*storepb.SeriesResponse, h *dedupResponseHeap) + testFn func(responses []*storepb.SeriesResponse, h *responseDeduplicator) tname string }{ { tname: "edge case with zero responses", responses: []*storepb.SeriesResponse{}, - testFn: func(responses []*storepb.SeriesResponse, h *dedupResponseHeap) { + testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) { testutil.Equals(t, false, h.Next()) callAtExpectPanic := func() { @@ -2232,7 +2232,7 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { }, }, }, - testFn: func(responses []*storepb.SeriesResponse, h *dedupResponseHeap) { + testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) { testutil.Equals(t, true, h.Next()) resp := h.At() testutil.Equals(t, responses[0], resp) @@ -2274,7 +2274,7 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { }, }, }, - testFn: func(responses []*storepb.SeriesResponse, h *dedupResponseHeap) { + testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) { testutil.Equals(t, true, h.Next()) resp := h.At() testutil.Equals(t, responses[0], resp) @@ -2283,8 +2283,9 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { }, } { t.Run(tcase.tname, func(t *testing.T) { - h := NewDedupResponseHeap(NewProxyResponseHeap( + h := NewResponseDeduplicator(NewProxyResponseLoserTree( &eagerRespSet{ + closeSeries: func() {}, wg: &sync.WaitGroup{}, bufferedResponses: tcase.responses, }, diff --git a/pkg/store/storepb/shard_info.go b/pkg/store/storepb/shard_info.go index e69617dd2f..28d559b49a 100644 --- a/pkg/store/storepb/shard_info.go +++ b/pkg/store/storepb/shard_info.go @@ -29,6 +29,9 @@ func (s *ShardMatcher) IsSharded() bool { } func (s *ShardMatcher) Close() { + if s == nil { + return + } if s.buffers != nil { s.buffers.Put(s.buf) } diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 79c5e07233..bc3442d04f 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -1260,8 +1260,8 @@ func TestStoreGatewayLazyExpandedPostingsEnabled(t *testing.T) { }) // Use greater or equal to handle flakiness. - testutil.Ok(t, s1.WaitSumMetrics(e2emon.GreaterOrEqual(1), "thanos_bucket_store_lazy_expanded_postings_total"), e2emon.WaitMissingMetrics()) - testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total"), e2emon.WaitMissingMetrics()) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.GreaterOrEqual(1), []string{"thanos_bucket_store_lazy_expanded_postings_total"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, s2.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_bucket_store_lazy_expanded_postings_total"}, e2emon.WaitMissingMetrics())) } var labelSetsComparer = cmp.Comparer(func(x, y []map[string]string) bool {