Skip to content

Commit

Permalink
Fix multi-tenant exemplar matchers
Browse files Browse the repository at this point in the history
The exemplar proxy synthesizes a query based on PromQL expression matchers
and individual store's label sets. When a store has multiple label sets
with same label names but different values (e.g. multitenant Receivers),
each exemplar matcher will be repeated once for each label set. Because of this,
a receiver hosting 200 tenants can get the same exemplar matcher 200 times. This leads
to the underlying stores slowing down and timing out when asked for exemplars.

This commit modifies the exemplar proxy to deduplicate matchers and only send
a matcher once to an underlying store.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Jul 30, 2022
1 parent fd275f8 commit f9c63ed
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 19 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Fixed
- [#5502](https://github.com/thanos-io/thanos/pull/5502) Receive: Handle exemplar storage errors as conflict error.
- [#5534](https://github.com/thanos-io/thanos/pull/5534) Query: Set struct return by query api alerts same as prometheus api
- [#5534](https://github.com/thanos-io/thanos/pull/5534) Query: Set struct return by query api alerts same as prometheus api.
- [#5554](https://github.com/thanos-io/thanos/pull/5534) Query/Receiver: Fix querying exemplars from multi-tenant receivers.

### Added

Expand Down
32 changes: 17 additions & 15 deletions pkg/exemplars/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exemplars
import (
"context"
"io"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -80,10 +81,11 @@ func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exe
)

for _, st := range s.exemplars() {
query := ""
var queryParts []string

Matchers:
for _, matchers := range selectors {
metricsSelector := ""
matcherSet := make(map[string]struct{})
for _, m := range matchers {
for _, ls := range st.LabelSets {
if lv := ls.Get(m.Name); lv != "" {
Expand All @@ -96,27 +98,27 @@ func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exe
continue
}
}
if metricsSelector == "" {
metricsSelector += m.String()
} else {
metricsSelector += ", " + m.String()
}
matcherSet[m.String()] = struct{}{}
}
}
// Construct the query by concatenating metric selectors with '+'.
// We cannot preserve the original query info, but the returned
// results are the same.
if query == "" {
query += "{" + metricsSelector + "}"
} else {
query += " + {" + metricsSelector + "}"

matchers := make([]string, 0, len(matcherSet))
for m := range matcherSet {
matchers = append(matchers, m)
}

queryParts = append(queryParts, "{"+strings.Join(matchers, ", ")+"}")
}

// No matchers match this store.
if query == "" {
if len(queryParts) == 0 {
continue
}

// Construct the query by concatenating metric selectors with '+'.
// We cannot preserve the original query info, but the returned
// results are the same.
query := strings.Join(queryParts, "+")
r := &exemplarspb.ExemplarsRequest{
Start: req.Start,
End: req.End,
Expand Down
66 changes: 63 additions & 3 deletions pkg/exemplars/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ package exemplars

import (
"context"
"fmt"
"io"
"os"
"reflect"
"sync"
"testing"

"github.com/prometheus/prometheus/promql/parser"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -49,9 +54,33 @@ func (t *testExemplarClient) Recv() (*exemplarspb.ExemplarsResponse, error) {
}

func (t *testExemplarClient) Exemplars(ctx context.Context, in *exemplarspb.ExemplarsRequest, opts ...grpc.CallOption) (exemplarspb.Exemplars_ExemplarsClient, error) {
expr, err := parser.ParseExpr(in.Query)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if err := t.assertUniqueMatchers(expr); err != nil {
return nil, err
}

return t, t.exemplarErr
}

func (t *testExemplarClient) assertUniqueMatchers(expr parser.Expr) error {
matchersList := parser.ExtractSelectors(expr)
for _, matchers := range matchersList {
matcherSet := make(map[string]struct{})
for _, matcher := range matchers {
if _, ok := matcherSet[matcher.String()]; ok {
return status.Error(codes.Internal, fmt.Sprintf("duplicate matcher set found %s", matcher))
}
matcherSet[matcher.String()] = struct{}{}
}
}

return nil
}

var _ exemplarspb.ExemplarsClient = &testExemplarClient{}

type testExemplarServer struct {
Expand Down Expand Up @@ -94,7 +123,7 @@ func TestProxy(t *testing.T) {
{
name: "proxy success",
request: &exemplarspb.ExemplarsRequest{
Query: "http_request_duration_bucket",
Query: `http_request_duration_bucket`,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
},
clients: []*exemplarspb.ExemplarStore{
Expand All @@ -105,7 +134,38 @@ func TestProxy(t *testing.T) {
Exemplars: []*exemplarspb.Exemplar{{Value: 1}},
}),
},
LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "A"})},
LabelSets: []labels.Labels{
labels.FromMap(map[string]string{"cluster": "A"}),
labels.FromMap(map[string]string{"cluster": "B"}),
},
},
},
server: &testExemplarServer{},
wantResponses: []*exemplarspb.ExemplarsResponse{
exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{
SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"__name__": "http_request_duration_bucket"}))},
Exemplars: []*exemplarspb.Exemplar{{Value: 1}},
}),
},
},
{
name: "proxy success with multiple selectors",
request: &exemplarspb.ExemplarsRequest{
Query: `http_request_duration_bucket{region="us-east1"} / on (region) group_left() http_request_duration_bucket`,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
},
clients: []*exemplarspb.ExemplarStore{
{
ExemplarsClient: &testExemplarClient{
response: exemplarspb.NewExemplarsResponse(&exemplarspb.ExemplarData{
SeriesLabels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(map[string]string{"__name__": "http_request_duration_bucket"}))},
Exemplars: []*exemplarspb.Exemplar{{Value: 1}},
}),
},
LabelSets: []labels.Labels{
labels.FromMap(map[string]string{"cluster": "A"}),
labels.FromMap(map[string]string{"cluster": "B"}),
},
},
},
server: &testExemplarServer{},
Expand All @@ -119,7 +179,7 @@ func TestProxy(t *testing.T) {
{
name: "warning proxy success",
request: &exemplarspb.ExemplarsRequest{
Query: "http_request_duration_bucket",
Query: `http_request_duration_bucket`,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
},
clients: []*exemplarspb.ExemplarStore{
Expand Down

0 comments on commit f9c63ed

Please sign in to comment.