Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: fix exemplar proxying for receivers with multiple tenants #7326

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed

- [#7323](https://github.com/thanos-io/thanos/pull/7323) Sidecar: wait for prometheus on startup
- [#7326](https://github.com/thanos-io/thanos/pull/7326) Query: fixing exemplars proxy when querying stores with multiple tenants.

### Added

Expand Down
41 changes: 22 additions & 19 deletions pkg/exemplars/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -85,28 +86,21 @@ func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exe
for _, st := range s.exemplars() {
queryParts = queryParts[:0]

Matchers:
for _, matchers := range selectors {
matcherSet := make(map[string]struct{})
for _, m := range matchers {
for _, ls := range st.LabelSets {
if lv := ls.Get(m.Name); lv != "" {
if !m.Matches(lv) {
continue Matchers
} else {
// If the current matcher matches one external label,
// we don't add it to the current metric selector
// as Prometheus' Exemplars API cannot handle external labels.
continue
}
}
matcherSet[m.String()] = struct{}{}
}
for _, matcherSet := range selectors {
extLbls := st.LabelSets
if !store.LabelSetsMatch(matcherSet, extLbls...) {
continue
}

labelMatchers = labelMatchers[:0]
for m := range matcherSet {
labelMatchers = append(labelMatchers, m)
for _, m := range matcherSet {
if isExternalLabel(m.Name, extLbls) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should propagate labels matchers here, but check them in tsdb.go. Otherwise all tenants in a receiver will match whenever a single tenant matches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it will fail when proxying via sidecar? Because we are also passing the query as is there. We could leave this as a follow up and maintain behavior for now.

I think the best approach is to cover on all places, but would require ppl upgrading all components if they are running on distributed mode. Otherwise exemplars queries will not match anything.

// If the current matcher matches one external label,
// we don't add it to the current metric selector
// as Prometheus' Exemplars API cannot handle external labels.
continue
}
labelMatchers = append(labelMatchers, m.String())
}

queryParts = append(queryParts, "{"+strings.Join(labelMatchers, ", ")+"}")
Expand Down Expand Up @@ -164,6 +158,15 @@ func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exe
return nil
}

func isExternalLabel(name string, sets []labels.Labels) bool {
pedro-stanaka marked this conversation as resolved.
Show resolved Hide resolved
for _, ls := range sets {
if ls.Get(name) != "" {
return true
}
}
return false
}

func (stream *exemplarsStream) receive(ctx context.Context) error {
exemplars, err := stream.client.Exemplars(ctx, stream.request)
if err != nil {
Expand Down
29 changes: 28 additions & 1 deletion pkg/exemplars/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc/status"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -243,6 +244,31 @@ func TestProxy(t *testing.T) {
}),
},
},
{
name: "one external label matches one of the selector labels",
request: &exemplarspb.ExemplarsRequest{
Query: `http_request_duration_bucket{cluster="A"}`,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
},
clients: []*exemplarspb.ExemplarStore{
{
ExemplarsClient: &testExemplarClient{
response: exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{
SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"__name__": "http_request_duration_bucket"}))},
Exemplars: []*exemplarspb.Exemplar{{Value: 1}},
}),
},
LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "non-matching"}), labels.FromMap(map[string]string{"cluster": "A"})},
},
},
server: &testExemplarServer{},
wantResponses: []*exemplarspb.ExemplarsResponse{
exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{
SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"__name__": "http_request_duration_bucket"}))},
Exemplars: []*exemplarspb.Exemplar{{Value: 1}},
}),
},
},
{
name: "external label selects stores",
request: &exemplarspb.ExemplarsRequest{
Expand Down Expand Up @@ -341,12 +367,13 @@ func TestProxy(t *testing.T) {
// for matched response for simplicity.
Outer:
for _, exp := range tc.wantResponses {
var lres *exemplarspb.ExemplarsResponse
for _, res := range tc.server.responses {
if reflect.DeepEqual(exp, res) {
continue Outer
}
}
t.Errorf("miss expected response %v", exp)
t.Errorf("miss expected response %v. got: %v", exp, lres)
}
})
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func storeMatches(ctx context.Context, s Client, debugLogging bool, mint, maxt i
}

extLset := s.LabelSets()
if !labelSetsMatch(matchers, extLset...) {
if !LabelSetsMatch(matchers, extLset...) {
if debugLogging {
reason = fmt.Sprintf("external labels %v does not match request label matchers: %v", extLset, matchers)
}
Expand All @@ -449,16 +449,16 @@ func storeMatchDebugMetadata(s Client, storeDebugMatchers [][]*labels.Matcher) (

match := false
for _, sm := range storeDebugMatchers {
match = match || labelSetsMatch(sm, labels.FromStrings("__address__", addr))
match = match || LabelSetsMatch(sm, labels.FromStrings("__address__", addr))
}
if !match {
return false, fmt.Sprintf("__address__ %v does not match debug store metadata matchers: %v", addr, storeDebugMatchers)
}
return true, ""
}

// labelSetsMatch returns false if all label-set do not match the matchers (aka: OR is between all label-sets).
func labelSetsMatch(matchers []*labels.Matcher, lset ...labels.Labels) bool {
// LabelSetsMatch returns false if all label-set do not match the matchers (aka: OR is between all label-sets).
func LabelSetsMatch(matchers []*labels.Matcher, lset ...labels.Labels) bool {
if len(lset) == 0 {
return true
}
Expand Down
Loading