Skip to content

Commit

Permalink
perf(chstorage): query labels and label values from deduplicated reso…
Browse files Browse the repository at this point in the history
…urce
  • Loading branch information
tdakkota committed Dec 4, 2024
1 parent f5311cc commit 2a8ea8d
Showing 1 changed file with 28 additions and 16 deletions.
44 changes: 28 additions & 16 deletions internal/chstorage/querier_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chstorage
import (
"context"
"slices"
"time"

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
Expand Down Expand Up @@ -60,15 +61,15 @@ func (q *Querier) LabelNames(ctx context.Context, opts logstorage.LabelsOptions)
}
)
if err := q.do(ctx, selectQuery{
Query: chsql.Select(table,
Query: chsql.SelectFrom(
// Select deduplicated resources from subquery.
q.deduplicatedResource(table, opts.Start, opts.End),
chsql.ResultColumn{
Name: "name",
Expr: chsql.ArrayJoin(attrKeys(colResource)),
Data: &name,
},
).
}).
Distinct(true).
Where(chsql.InTimeRange("timestamp", opts.Start, opts.End)).
Limit(q.labelLimit),
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < name.Rows(); i++ {
Expand Down Expand Up @@ -162,22 +163,24 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto
labelName = key
}

resourceQuery := q.deduplicatedResource(table, opts.Start, opts.End)
for _, m := range opts.Query.Matchers {
resourceQuery.Where(q.logQLLabelMatcher(m, mapping))
}
var (
value proto.ColStr
query = chsql.Select(table, chsql.ResultColumn{
Name: "value",
Expr: attrSelector(colResource, labelName),
Data: &value,
}).
query = chsql.SelectFrom(
// Select deduplicated resources from subquery.
resourceQuery,
chsql.ResultColumn{
Name: "value",
Expr: attrSelector(colResource, labelName),
Data: &value,
}).
Distinct(true).
Where(chsql.InTimeRange("timestamp", opts.Start, opts.End))
Order(chsql.Ident("value"), chsql.Asc).
Limit(q.labelLimit)
)
for _, m := range opts.Query.Matchers {
query.Where(q.logQLLabelMatcher(m, mapping))
}
query.Order(chsql.Ident("value"), chsql.Asc).
Limit(q.labelLimit)

if err := q.do(ctx, selectQuery{
Query: query,
OnResult: func(ctx context.Context, block proto.Block) error {
Expand Down Expand Up @@ -390,6 +393,15 @@ func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (re
return result, nil
}

func (q *Querier) deduplicatedResource(table string, start, end time.Time) *chsql.SelectQuery {
// Select deduplicated resource by using GROUP BY, since DISTINCT is not optimized by Clickhouse.
//
// See https://github.com/ClickHouse/ClickHouse/issues/4670
return chsql.Select(table, chsql.Column(colResource, nil)).
GroupBy(chsql.Ident(colResource)).
Where(chsql.InTimeRange("timestamp", start, end))
}

func forEachColMap[K comparable, V any](c *proto.ColMap[K, V], row int, cb func(K, V)) {
var start int
end := int(c.Offsets[row])
Expand Down

0 comments on commit 2a8ea8d

Please sign in to comment.