Skip to content

Commit

Permalink
fix(chstorage): store normalized metric name to query by
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jan 9, 2024
1 parent 25accd3 commit 1e554d6
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 63 deletions.
29 changes: 19 additions & 10 deletions internal/chstorage/columns_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
)

type pointColumns struct {
name *proto.ColLowCardinality[string]
timestamp *proto.ColDateTime64
name *proto.ColLowCardinality[string]
nameNormalized *proto.ColLowCardinality[string]
timestamp *proto.ColDateTime64

mapping proto.ColEnum8
value proto.ColFloat64
Expand All @@ -18,14 +19,16 @@ type pointColumns struct {

func newPointColumns() *pointColumns {
return &pointColumns{
name: new(proto.ColStr).LowCardinality(),
timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
name: new(proto.ColStr).LowCardinality(),
nameNormalized: new(proto.ColStr).LowCardinality(),
timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
}
}

func (c *pointColumns) Columns() Columns {
return Columns{
{Name: "name", Data: c.name},
{Name: "name_normalized", Data: c.nameNormalized},
{Name: "timestamp", Data: c.timestamp},

{Name: "mapping", Data: proto.Wrap(&c.mapping, metricMappingDDL)},
Expand All @@ -41,8 +44,9 @@ func (c *pointColumns) Input() proto.Input { return c.Columns().Input() }
func (c *pointColumns) Result() proto.Results { return c.Columns().Result() }

type expHistogramColumns struct {
name *proto.ColLowCardinality[string]
timestamp *proto.ColDateTime64
name *proto.ColLowCardinality[string]
nameNormalized *proto.ColLowCardinality[string]
timestamp *proto.ColDateTime64

count proto.ColUInt64
sum *proto.ColNullable[float64]
Expand All @@ -62,8 +66,9 @@ type expHistogramColumns struct {

func newExpHistogramColumns() *expHistogramColumns {
return &expHistogramColumns{
name: new(proto.ColStr).LowCardinality(),
timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
name: new(proto.ColStr).LowCardinality(),
nameNormalized: new(proto.ColStr).LowCardinality(),
timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),

sum: new(proto.ColFloat64).Nullable(),
min: new(proto.ColFloat64).Nullable(),
Expand All @@ -76,6 +81,7 @@ func newExpHistogramColumns() *expHistogramColumns {
func (c *expHistogramColumns) Columns() Columns {
return Columns{
{Name: "name", Data: c.name},
{Name: "name_normalized", Data: c.nameNormalized},
{Name: "timestamp", Data: c.timestamp},

{Name: "exp_histogram_count", Data: &c.count},
Expand Down Expand Up @@ -125,8 +131,9 @@ func (c *labelsColumns) Input() proto.Input { return c.Columns().Input() }
func (c *labelsColumns) Result() proto.Results { return c.Columns().Result() }

type exemplarColumns struct {
name *proto.ColLowCardinality[string]
timestamp *proto.ColDateTime64
name *proto.ColLowCardinality[string]
nameNormalized *proto.ColLowCardinality[string]
timestamp *proto.ColDateTime64

filteredAttributes proto.ColStr
exemplarTimestamp *proto.ColDateTime64
Expand All @@ -141,6 +148,7 @@ type exemplarColumns struct {
func newExemplarColumns() *exemplarColumns {
return &exemplarColumns{
name: new(proto.ColStr).LowCardinality(),
nameNormalized: new(proto.ColStr).LowCardinality(),
timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
exemplarTimestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
}
Expand All @@ -149,6 +157,7 @@ func newExemplarColumns() *exemplarColumns {
func (c *exemplarColumns) Columns() Columns {
return Columns{
{Name: "name", Data: c.name},
{Name: "name_normalized", Data: c.nameNormalized},
{Name: "timestamp", Data: c.timestamp},

{Name: "filtered_attributes", Data: &c.filteredAttributes},
Expand Down
4 changes: 4 additions & 0 deletions internal/chstorage/inserter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (b *metricsBatch) addPoints(name string, res *lazyAttributes, slice pmetric
return errors.Wrap(err, "map exemplars")
}
c.name.Append(name)
c.nameNormalized.Append(otelstorage.KeyToLabel(name))
c.timestamp.Append(ts)
c.mapping.Append(proto.Enum8(noMapping))
c.value.Append(val)
Expand Down Expand Up @@ -366,6 +367,7 @@ func (b *metricsBatch) addExpHistogramPoints(name string, res *lazyAttributes, s
return errors.Wrap(err, "map exemplars")
}
c.name.Append(name)
c.nameNormalized.Append(otelstorage.KeyToLabel(name))
c.timestamp.Append(ts)
c.count.Append(count)
c.sum.Append(sum)
Expand Down Expand Up @@ -449,6 +451,7 @@ func (b *metricsBatch) addMappedSample(
) {
c := b.points
c.name.Append(name)
c.nameNormalized.Append(otelstorage.KeyToLabel(name))
c.timestamp.Append(series.Timestamp)
c.mapping.Append(proto.Enum8(mapping))
c.value.Append(val)
Expand Down Expand Up @@ -491,6 +494,7 @@ func (b *metricsBatch) addExemplar(p exemplarSeries, e pmetric.Exemplar, bucketK
}

c.name.Append(p.Name)
c.nameNormalized.Append(p.Name)
c.timestamp.Append(p.Timestamp)

c.filteredAttributes.Append(encodeAttributes(e.FilteredAttributes()))
Expand Down
58 changes: 5 additions & 53 deletions internal/chstorage/querier_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (q *Querier) Querier(mint, maxt int64) (storage.Querier, error) {
tables: q.tables,
tracer: q.tracer,
getLabelMapping: q.getMetricsLabelMapping,
getMetricName: q.getMetricName,
}, nil
}

Expand All @@ -52,7 +51,6 @@ type promQuerier struct {
ch chClient
tables Tables
getLabelMapping func(context.Context, []string) (map[string]string, error)
getMetricName func(context.Context, string) (string, error)

tracer trace.Tracer
}
Expand Down Expand Up @@ -181,47 +179,6 @@ func addLabelMatchers(query *strings.Builder, matchers []*labels.Matcher) error
return nil
}

func (q *Querier) getMetricName(ctx context.Context, name string) (metricMapped string, rerr error) {
ctx, span := q.tracer.Start(ctx, "getMetricName",
trace.WithAttributes(
attribute.String("chstorage.metric.name", name),
),
)
defer func() {
if rerr != nil {
span.RecordError(rerr)
}
span.End()
}()
mapped := new(proto.ColStr)
if err := q.ch.Do(ctx, ch.Query{
Result: proto.Results{
{Name: "value", Data: mapped},
},
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < mapped.Rows(); i++ {
metricMapped = mapped.Row(i)
}
return nil
},
Body: fmt.Sprintf(`SELECT value FROM %[1]s WHERE name_normalized = %[2]s AND name = %[2]s AND value_normalized = %[3]s`,
q.tables.Labels, singleQuoted(labels.MetricName), singleQuoted(name),
),
}); err != nil {
return "", errors.Wrap(err, "select")
}
if metricMapped == "" {
// No mapping found.
metricMapped = name
}
span.AddEvent("fetched_metric_name_mapping",
trace.WithAttributes(
attribute.String("chstorage.metric.mapped", metricMapped),
),
)
return metricMapped, nil
}

func (q *Querier) getMetricsLabelMapping(ctx context.Context, input []string) (_ map[string]string, rerr error) {
ctx, span := q.tracer.Start(ctx, "getMetricsLabelMapping",
trace.WithAttributes(
Expand Down Expand Up @@ -367,7 +324,7 @@ func (p *promQuerier) selectSeries(ctx context.Context, sortSeries bool, hints *
}
{
selectors := []string{
"name",
"name_normalized",
}
if name := m.Name; name != labels.MetricName {
if mapped, ok := mapping[name]; ok {
Expand All @@ -379,13 +336,6 @@ func (p *promQuerier) selectSeries(ctx context.Context, sortSeries bool, hints *
}
}
value := m.Value
if m.Name == labels.MetricName {
mappedValue, err := p.getMetricName(ctx, m.Value)
if err != nil {
return "", errors.Wrap(err, "get metric name mapping")
}
value = mappedValue
}
query.WriteString("(\n")
for i, sel := range selectors {
if i != 0 {
Expand Down Expand Up @@ -478,6 +428,7 @@ func (p *promQuerier) queryPoints(ctx context.Context, query string) ([]storage.
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < c.timestamp.Rows(); i++ {
name := c.name.Row(i)
nameNormalized := c.nameNormalized.Row(i)
value := c.value.Row(i)
timestamp := c.timestamp.Row(i)
attributes := c.attributes.Row(i)
Expand All @@ -500,7 +451,7 @@ func (p *promQuerier) queryPoints(ctx context.Context, query string) ([]storage.
s.series.data.values = append(s.series.data.values, value)
s.series.ts = append(s.series.ts, timestamp.UnixMilli())

s.labels[labels.MetricName] = otelstorage.KeyToLabel(name)
s.labels[labels.MetricName] = nameNormalized
if err := parseLabels(resource, s.labels); err != nil {
return errors.Wrap(err, "parse resource")
}
Expand Down Expand Up @@ -543,6 +494,7 @@ func (p *promQuerier) queryExpHistograms(ctx context.Context, query string) ([]s
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < c.timestamp.Rows(); i++ {
name := c.name.Row(i)
nameNormalized := c.nameNormalized.Row(i)
timestamp := c.timestamp.Row(i)
count := c.count.Row(i)
sum := c.sum.Row(i)
Expand Down Expand Up @@ -583,7 +535,7 @@ func (p *promQuerier) queryExpHistograms(ctx context.Context, query string) ([]s
s.series.data.negativeBucketCounts = append(s.series.data.negativeBucketCounts, negativeBucketCounts)
s.series.ts = append(s.series.ts, timestamp.UnixMilli())

s.labels[labels.MetricName] = otelstorage.KeyToLabel(name)
s.labels[labels.MetricName] = nameNormalized
if err := parseLabels(resource, s.labels); err != nil {
return errors.Wrap(err, "parse resource")
}
Expand Down
3 changes: 3 additions & 0 deletions internal/chstorage/schema_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
pointsSchema = `
(
name LowCardinality(String),
name_normalized LowCardinality(String),
timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
mapping Enum8(` + metricMappingDDL + `) CODEC(T64),
Expand Down Expand Up @@ -54,6 +55,7 @@ const (
expHistogramsSchema = `
(
name LowCardinality(String),
name_normalized LowCardinality(String),
timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
exp_histogram_count UInt64,
Expand All @@ -76,6 +78,7 @@ const (
exemplarsSchema = `
(
name LowCardinality(String),
name_normalized LowCardinality(String),
timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
filtered_attributes String,
Expand Down

0 comments on commit 1e554d6

Please sign in to comment.