Skip to content

Commit

Permalink
fix(chstorage): use chsql for getMetricsLabelMapping too
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jun 7, 2024
1 parent 6604bc7 commit 7be284b
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 21 deletions.
2 changes: 1 addition & 1 deletion internal/chstorage/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (q *Querier) do(ctx context.Context, s selectQuery) error {

queryStartTime := time.Now()
if err := q.ch.Do(ctx, query); err != nil {
return err
return errors.Wrapf(err, "execute %s (signal: %s)", s.Type, s.Signal)
}

q.clickhouseRequestHistogram.Record(ctx, time.Since(queryStartTime).Seconds(),
Expand Down
2 changes: 1 addition & 1 deletion internal/chstorage/querier_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (q *Querier) getLabelMapping(ctx context.Context, labels []string) (_ map[s
{Name: "name", Data: &inputData},
},

Type: "LabelMapping",
Type: "getLabelMapping",
Signal: "logs",
Table: table,
}); err != nil {
Expand Down
11 changes: 4 additions & 7 deletions internal/chstorage/querier_logs_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,24 +97,21 @@ func (v *LogsQuery[E]) Execute(ctx context.Context, q *Querier) (_ iterators.Ite
if err := q.do(ctx, selectQuery{
Query: query,
OnResult: func(ctx context.Context, block proto.Block) error {
if err := out.ForEach(func(r logstorage.Record) error {
return out.ForEach(func(r logstorage.Record) error {
e, err := v.Mapper(r)
if err != nil {
return err
}
data = append(data, e)
return nil
}); err != nil {
return errors.Wrap(err, "for each")
}
return nil
})
},

Type: "QueryLogs",
Signal: "logs",
Table: table,
}); err != nil {
return nil, errors.Wrap(err, "execute LogsQuery")
return nil, err
}

return iterators.Slice(data), nil
Expand Down Expand Up @@ -262,7 +259,7 @@ func (v *SampleQuery) Execute(ctx context.Context, q *Querier) (_ logqlengine.Sa
Signal: "logs",
Table: table,
}); err != nil {
return nil, errors.Wrap(err, "execute SampleQuery")
return nil, err
}

return iterators.Slice(result), nil
Expand Down
36 changes: 24 additions & 12 deletions internal/chstorage/querier_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package chstorage

import (
"context"
"fmt"
"slices"
"strconv"
"time"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -214,9 +212,12 @@ func promQLLabelMatcher(valueSel []chsql.Expr, typ labels.MatchType, value strin
}

func (q *Querier) getMetricsLabelMapping(ctx context.Context, input []string) (_ map[string]string, rerr error) {
table := q.tables.Labels

ctx, span := q.tracer.Start(ctx, "chstorage.metrics.getMetricsLabelMapping",
trace.WithAttributes(
attribute.StringSlice("chstorage.labels", input),
attribute.String("chstorage.table", table),
),
)
defer func() {
Expand All @@ -227,20 +228,28 @@ func (q *Querier) getMetricsLabelMapping(ctx context.Context, input []string) (_
}()

var (
out = make(map[string]string, len(input))

name = new(proto.ColStr).LowCardinality()
normalized = new(proto.ColStr).LowCardinality()

query = chsql.Select(table,
chsql.Column("name", name),
chsql.Column("name_normalized", normalized),
).
Where(chsql.In(
chsql.Ident("name_normalized"),
chsql.Ident("labels"),
))
)

var (
out = make(map[string]string, len(input))
inputData proto.ColStr
)
var inputData proto.ColStr
for _, label := range input {
inputData.Append(label)
}
if err := q.ch.Do(ctx, ch.Query{
Result: proto.Results{
{Name: "name", Data: name},
{Name: "name_normalized", Data: normalized},
},
if err := q.do(ctx, selectQuery{
Query: query,
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < normalized.Rows(); i++ {
out[normalized.Row(i)] = name.Row(i)
Expand All @@ -251,9 +260,12 @@ func (q *Querier) getMetricsLabelMapping(ctx context.Context, input []string) (_
ExternalData: []proto.InputColumn{
{Name: "name", Data: &inputData},
},
Body: fmt.Sprintf(`SELECT name, name_normalized FROM %[1]s WHERE name_normalized IN labels`, q.tables.Labels),

Type: "getMetricsLabelMapping",
Signal: "metrics",
Table: table,
}); err != nil {
return nil, errors.Wrap(err, "select")
return nil, err
}
span.AddEvent("labels_fetched", trace.WithAttributes(
xattribute.StringMap("chstorage.mapping", out),
Expand Down

0 comments on commit 7be284b

Please sign in to comment.