Skip to content

Commit

Permalink
Copy labels
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Jul 27, 2023
1 parent ec0d2c8 commit 66dd816
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 62 deletions.
4 changes: 2 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesBloom, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -480,7 +480,7 @@ func (s *promMetadata) UpdateLabelNames(ctx context.Context) {
s.mtx.Unlock()
}

func (s *promMetadata) LabelNamesBloom() stringset.Set {
func (s *promMetadata) LabelNamesSet() stringset.Set {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}
}

if s.labelNamesSet.TestAny(req.WithoutReplicaLabels) {
if s.labelNamesSet.HasAny(req.WithoutReplicaLabels) {
rs := &resortingServer{Store_SeriesServer: srv}
defer rs.Flush()
srv = rs
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ func TestBucketStore_LabelNamesSet_e2e(t *testing.T) {

filter := s.store.LabelNamesSet()
for _, n := range []string{"a", "b", "c", "ext1", "ext2"} {
testutil.Assert(t, filter.Test(n))
testutil.Assert(t, filter.Has(n))
}
})
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
}

labelNames := p.labelNamesSet()
if labelNames.TestAny(r.WithoutReplicaLabels) {
if labelNames.HasAny(r.WithoutReplicaLabels) {
level.Debug(p.logger).Log("msg", "resorting series due to internal label dedup")
rs := &resortingServer{Store_SeriesServer: s}
defer rs.Flush()
s = rs
Expand Down Expand Up @@ -467,9 +468,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(
}

r := storepb.NewSeriesResponse(&storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(
completeLabelset,
),
Labels: labelpb.ZLabelsFromPromLabels(completeLabelset),
Chunks: thanosChks,
})
if err := s.Send(r); err != nil {
Expand Down
52 changes: 52 additions & 0 deletions pkg/store/resorting_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package store

import (
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/exp/slices"

"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

type resortingServer struct {
storepb.Store_SeriesServer
series []*storepb.Series
}

func (r *resortingServer) Send(response *storepb.SeriesResponse) error {
if response.GetSeries() == nil {
return r.Store_SeriesServer.Send(response)
}

series := response.GetSeries()
lblsCopy := make([]labelpb.ZLabel, 0, len(series.Labels))
for _, l := range series.Labels {
lblsCopy = append(lblsCopy, labelpb.ZLabel{
Name: copyString(l.Name),
Value: copyString(l.Value),
})
}
series.Labels = lblsCopy

r.series = append(r.series, series)
return nil
}

func copyString(s string) string {
return string([]byte(s))
}

func (r *resortingServer) Flush() error {
slices.SortFunc(r.series, func(a, b *storepb.Series) bool {
return labels.Compare(
labelpb.ZLabelsToPromLabels(a.Labels),
labelpb.ZLabelsToPromLabels(b.Labels),
) < 0
})
for _, response := range r.series {
if err := r.Store_SeriesServer.Send(storepb.NewSeriesResponse(response)); err != nil {
return err
}
}
return nil
}
40 changes: 0 additions & 40 deletions pkg/store/sorting_server.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
extLsetToRemove[lbl] = struct{}{}
}

if s.LabelNamesSet().TestAny(r.WithoutReplicaLabels) {
if s.LabelNamesSet().HasAny(r.WithoutReplicaLabels) {
rs := &resortingServer{Store_SeriesServer: srv}
defer rs.Flush()
srv = rs
Expand Down
26 changes: 13 additions & 13 deletions pkg/stringset/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (
)

type Set interface {
Test(string) bool
TestAny([]string) bool
Has(string) bool
HasAny([]string) bool
}

type fixedSet struct {
cuckoo *cuckoo.Filter
}

func (f fixedSet) TestAny(strings []string) bool {
func (f fixedSet) HasAny(strings []string) bool {
for _, s := range strings {
if f.Test(s) {
if f.Has(s) {
return true
}
}
Expand All @@ -34,11 +34,11 @@ func NewFromStrings(items ...string) Set {
return &fixedSet{cuckoo: f}
}

func (f fixedSet) Test(s string) bool {
func (f fixedSet) Has(s string) bool {
return f.cuckoo.Lookup([]byte(s))
}

type elasticSet struct {
type mutableSet struct {
cuckoo *cuckoo.ScalableCuckooFilter
}

Expand All @@ -48,22 +48,22 @@ type MutableSet interface {
}

func New() MutableSet {
return &elasticSet{
return &mutableSet{
cuckoo: cuckoo.NewScalableCuckooFilter(),
}
}

func (e elasticSet) Insert(s string) {
func (e mutableSet) Insert(s string) {
e.cuckoo.Insert([]byte(s))
}

func (e elasticSet) Test(s string) bool {
func (e mutableSet) Has(s string) bool {
return e.cuckoo.Lookup([]byte(s))
}

func (e elasticSet) TestAny(strings []string) bool {
func (e mutableSet) HasAny(strings []string) bool {
for _, s := range strings {
if e.Test(s) {
if e.Has(s) {
return true
}
}
Expand All @@ -72,14 +72,14 @@ func (e elasticSet) TestAny(strings []string) bool {

type allStringsSet struct{}

func (e allStringsSet) TestAny(strings []string) bool {
func (e allStringsSet) HasAny(strings []string) bool {
return true
}

func AllStrings() *allStringsSet {
return &allStringsSet{}
}

func (e allStringsSet) Test(s string) bool {
func (e allStringsSet) Has(_ string) bool {
return true
}

0 comments on commit 66dd816

Please sign in to comment.