Skip to content

Commit

Permalink
fix(chstorage): properly handle matchers in LabelNames
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jun 10, 2024
1 parent 8a0c014 commit cdb9ae0
Showing 1 changed file with 169 additions and 22 deletions.
191 changes: 169 additions & 22 deletions internal/chstorage/querier_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *promQuerier) getEnd(t time.Time) time.Time {
// It is not safe to use the strings beyond the lifetime of the querier.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (p *promQuerier) LabelValues(ctx context.Context, labelName string, matchers ...*labels.Matcher) (_ []string, _ annotations.Annotations, rerr error) {
func (p *promQuerier) LabelValues(ctx context.Context, labelName string, matchers ...*labels.Matcher) (result []string, _ annotations.Annotations, rerr error) {
ctx, span := p.tracer.Start(ctx, "chstorage.metrics.LabelValues",
trace.WithAttributes(
attribute.String("chstorage.label", labelName),
Expand All @@ -105,6 +105,10 @@ func (p *promQuerier) LabelValues(ctx context.Context, labelName string, matcher
defer func() {
if rerr != nil {
span.RecordError(rerr)
} else {
span.AddEvent("values_fetched", trace.WithAttributes(
attribute.Int("chstorage.total_values", len(result)),
))
}
span.End()
}()
Expand All @@ -113,7 +117,7 @@ func (p *promQuerier) LabelValues(ctx context.Context, labelName string, matcher
return labelName != m.Name
})
if matchesOtherLabels {
r, err := p.getMatchingLabelValues(ctx, labelName, matchers...)
r, err := p.getMatchingLabelValues(ctx, labelName, matchers)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -187,14 +191,11 @@ func (p *promQuerier) getLabelValues(ctx context.Context, labelName string, matc
}); err != nil {
return nil, err
}
span.AddEvent("values_fetched", trace.WithAttributes(
attribute.Int("chstorage.total_values", len(result)),
))

return result, nil
}

func (p *promQuerier) getMatchingLabelValues(ctx context.Context, labelName string, matchers ...*labels.Matcher) (_ []string, rerr error) {
func (p *promQuerier) getMatchingLabelValues(ctx context.Context, labelName string, matchers []*labels.Matcher) (_ []string, rerr error) {
ctx, span := p.tracer.Start(ctx, "chstorage.metrics.getMatchingLabelValues",
trace.WithAttributes(
attribute.String("chstorage.label", labelName),
Expand Down Expand Up @@ -257,6 +258,7 @@ func (p *promQuerier) getMatchingLabelValues(ctx context.Context, labelName stri
}
query.Where(expr)
}
query.Limit(1000)

if err := p.do(ctx, selectQuery{
Query: query,
Expand All @@ -275,7 +277,6 @@ func (p *promQuerier) getMatchingLabelValues(ctx context.Context, labelName stri
}); err != nil {
return nil, err
}
query.Limit(1000)

return result, nil
}
Expand Down Expand Up @@ -335,32 +336,55 @@ func (p *promQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matche
defer func() {
if rerr != nil {
span.RecordError(rerr)
} else {
span.AddEvent("names_fetched", trace.WithAttributes(
attribute.Int("chstorage.total_names", len(result)),
))
}
span.End()
}()

var (
value = new(proto.ColStr).LowCardinality()
query = chsql.Select(table, chsql.Column("name_normalized", value)).
Distinct(true)
)
for _, m := range matchers {
expr, err := promQLLabelMatcher(
[]chsql.Expr{chsql.Ident("name_normalized")},
m.Type,
m.Value,
)
var err error
if len(matchers) > 0 {
result, err = p.getMatchingLabelNames(ctx, matchers)
if err != nil {
return nil, nil, err
}
query.Where(expr)
return result, nil, nil
}

result, err = p.getLabelNames(ctx)
if err != nil {
return nil, nil, err
}
return result, nil, nil
}

func (p *promQuerier) getLabelNames(ctx context.Context) (result []string, rerr error) {
table := p.tables.Labels

ctx, span := p.tracer.Start(ctx, "chstorage.metrics.getLabelNames",
trace.WithAttributes(
attribute.String("chstorage.table", table),
),
)
defer func() {
if rerr != nil {
span.RecordError(rerr)
}
span.End()
}()

var (
value = new(proto.ColStr).LowCardinality()
query = chsql.Select(table, chsql.Column("name_normalized", value)).
Distinct(true)
)
if err := p.do(ctx, selectQuery{
Query: query,
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < value.Rows(); i++ {
result = append(result, value.Row(i))
result = append(result, otelstorage.KeyToLabel(value.Row(i)))
}
return nil
},
Expand All @@ -369,10 +393,133 @@ func (p *promQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matche
Signal: "metrics",
Table: table,
}); err != nil {
return nil, nil, err
return nil, err
}

return result, nil, nil
return result, nil
}

func (p *promQuerier) getMatchingLabelNames(ctx context.Context, matchers []*labels.Matcher) (_ []string, rerr error) {
ctx, span := p.tracer.Start(ctx, "chstorage.metrics.getMatchingLabelNames",
trace.WithAttributes(
xattribute.StringerSlice("chstorage.matchers", matchers),
),
)
defer func() {
if rerr != nil {
span.RecordError(rerr)
}
span.End()
}()

mlabels := make([]string, 0, len(matchers))
for _, m := range matchers {
mlabels = append(mlabels, m.Name)
}
mapping, err := p.getLabelMapping(ctx, mlabels)
if err != nil {
return nil, errors.Wrap(err, "get label mapping")
}

query := func(ctx context.Context, table string) (result []string, rerr error) {
var (
value proto.ColStr

query = chsql.Select(table, chsql.ResultColumn{
Name: "values",
Expr: chsql.ArrayJoin(
chsql.ArrayConcat(
attrKeys(colAttrs),
attrKeys(colScope),
attrKeys(colResource),
),
),
Data: &value,
}).
Distinct(true).
Where(chsql.InTimeRange("timestamp", p.mint, p.maxt))
)
for _, m := range matchers {
selectors := []chsql.Expr{
chsql.Ident("name_normalized"),
}
if name := m.Name; name != labels.MetricName {
if mapped, ok := mapping[name]; ok {
name = mapped
}
selectors = []chsql.Expr{
attrSelector(colAttrs, name),
attrSelector(colScope, name),
attrSelector(colResource, name),
}
}

expr, err := promQLLabelMatcher(selectors, m.Type, m.Value)
if err != nil {
return nil, err
}
query.Where(expr)
}
query.Limit(1000)

if err := p.do(ctx, selectQuery{
Query: query,
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < value.Rows(); i++ {
result = append(result, otelstorage.KeyToLabel(value.Row(i)))
}
return nil
},

Type: "getMatchingLabelValues",
Signal: "metrics",
Table: table,
}); err != nil {
return nil, err
}

return result, nil
}

var (
points []string
expHists []string
)
grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() error {
ctx := grpCtx
table := p.tables.Points

result, err := query(ctx, table)
if err != nil {
return errors.Wrap(err, "query points series")
}
points = result

return nil
})
grp.Go(func() error {
ctx := grpCtx
table := p.tables.ExpHistograms

result, err := query(ctx, table)
if err != nil {
return errors.Wrap(err, "query exponential histogram series")
}
expHists = result

return nil
})
if err := grp.Wait(); err != nil {
return nil, err
}

points = append(points, labels.MetricName)
points = append(points, expHists...)
slices.Sort(points)
points = slices.Clip(points)

return points, nil
}

func promQLLabelMatcher(valueSel []chsql.Expr, typ labels.MatchType, value string) (e chsql.Expr, rerr error) {
Expand Down

0 comments on commit cdb9ae0

Please sign in to comment.