Skip to content

Commit

Permalink
replace model.Metric with labels.Labels in distributor.MetricsForLabe…
Browse files Browse the repository at this point in the history
…lMatchers() (#1788)
  • Loading branch information
ch33hau authored Apr 29, 2022
1 parent 971a26c commit 62148b2
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 35 deletions.
8 changes: 4 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, match
}

// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
if err != nil {
return nil, err
Expand All @@ -1282,15 +1282,15 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
return nil, err
}

metrics := map[model.Fingerprint]model.Metric{}
metrics := map[uint64]labels.Labels{}
for _, resp := range resps {
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp.(*ingester_client.MetricsForLabelMatchersResponse))
for _, m := range ms {
metrics[m.Fingerprint()] = m
metrics[m.Hash()] = m
}
}

result := make([]model.Metric, 0, len(metrics))
result := make([]labels.Labels, 0, len(metrics))
for _, m := range metrics {
result = append(result, m)
}
Expand Down
27 changes: 13 additions & 14 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/chunkcompat"
"github.com/grafana/mimir/pkg/util/limiter"
util_math "github.com/grafana/mimir/pkg/util/math"
Expand Down Expand Up @@ -1755,23 +1754,23 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
tests := map[string]struct {
shuffleShardSize int
matchers []*labels.Matcher
expectedResult []model.Metric
expectedResult []labels.Labels
expectedIngesters int
}{
"should return an empty response if no metric match": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "unknown"),
},
expectedResult: []model.Metric{},
expectedResult: []labels.Labels{},
expectedIngesters: numIngesters,
},
"should filter metrics by single matcher": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []model.Metric{
util.LabelsToMetric(fixtures[0].lbls),
util.LabelsToMetric(fixtures[1].lbls),
expectedResult: []labels.Labels{
fixtures[0].lbls,
fixtures[1].lbls,
},
expectedIngesters: numIngesters,
},
Expand All @@ -1780,18 +1779,18 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
mustNewMatcher(labels.MatchEqual, "status", "200"),
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []model.Metric{
util.LabelsToMetric(fixtures[0].lbls),
expectedResult: []labels.Labels{
fixtures[0].lbls,
},
expectedIngesters: numIngesters,
},
"should return all matching metrics even if their FastFingerprint collide": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "fast_fingerprint_collision"),
},
expectedResult: []model.Metric{
util.LabelsToMetric(fixtures[3].lbls),
util.LabelsToMetric(fixtures[4].lbls),
expectedResult: []labels.Labels{
fixtures[3].lbls,
fixtures[4].lbls,
},
expectedIngesters: numIngesters,
},
Expand All @@ -1800,9 +1799,9 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []model.Metric{
util.LabelsToMetric(fixtures[0].lbls),
util.LabelsToMetric(fixtures[1].lbls),
expectedResult: []labels.Labels{
fixtures[0].lbls,
fixtures[1].lbls,
},
expectedIngesters: 3,
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (mo
}

// FromMetricsForLabelMatchersResponse unpacks a MetricsForLabelMatchersResponse proto
func FromMetricsForLabelMatchersResponse(resp *MetricsForLabelMatchersResponse) []model.Metric {
metrics := []model.Metric{}
func FromMetricsForLabelMatchersResponse(resp *MetricsForLabelMatchersResponse) []labels.Labels {
metrics := []labels.Labels{}
for _, m := range resp.Metric {
metrics = append(metrics, mimirpb.FromLabelAdaptersToMetric(m.Labels))
metrics = append(metrics, mimirpb.FromLabelAdaptersToLabels(m.Labels))
}
return metrics
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Distributor interface {
QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*client.ExemplarQueryResponse, error)
LabelValuesForLabelName(ctx context.Context, from, to model.Time, label model.LabelName, matchers ...*labels.Matcher) ([]string, error)
LabelNames(ctx context.Context, from model.Time, to model.Time, matchers ...*labels.Matcher) ([]string, error)
MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error)
MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher) (*client.LabelNamesAndValuesResponse, error)
LabelValuesCardinality(ctx context.Context, labelNames []model.LabelName, matchers []*labels.Matcher) (uint64, *client.LabelValuesCardinalityResponse, error)
Expand Down Expand Up @@ -104,7 +104,7 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(ms)
return series.LabelsToSeriesSet(ms)
}

return q.streamingSelect(ctx, minT, maxT, matchers)
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
distributor := &mockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]model.Metric{}, nil)
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil)

ctx := user.InjectOrgID(context.Background(), "test")
queryable := newDistributorQueryable(distributor, nil, testData.queryIngestersWithin, log.NewNopLogger())
Expand Down Expand Up @@ -391,9 +391,9 @@ func (m *mockDistributor) LabelNames(ctx context.Context, from, to model.Time, m
args := m.Called(ctx, from, to, matchers)
return args.Get(0).([]string), args.Error(1)
}
func (m *mockDistributor) MetricsForLabelMatchers(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
func (m *mockDistributor) MetricsForLabelMatchers(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).([]model.Metric), args.Error(1)
return args.Get(0).([]labels.Labels), args.Error(1)
}

func (m *mockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {

t.Run("series", func(t *testing.T) {
distributor := &mockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]model.Metric{}, nil)
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, nil, nil, log.NewNopLogger(), nil)
q, err := queryable.Querier(ctx, util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime))
Expand Down Expand Up @@ -771,7 +771,7 @@ func TestQuerier_MaxLabelsQueryRange(t *testing.T) {

t.Run("series", func(t *testing.T) {
distributor := &mockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]model.Metric{}, nil)
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, storeQueryable, nil, log.NewNopLogger(), nil)
q, err := queryable.Querier(ctx, util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime))
Expand Down Expand Up @@ -856,7 +856,7 @@ func (m *errDistributor) LabelValuesForLabelName(context.Context, model.Time, mo
func (m *errDistributor) LabelNames(context.Context, model.Time, model.Time, ...*labels.Matcher) ([]string, error) {
return nil, errDistributorError
}
func (m *errDistributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
func (m *errDistributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
return nil, errDistributorError
}

Expand Down Expand Up @@ -894,7 +894,7 @@ func (d *emptyDistributor) LabelNames(context.Context, model.Time, model.Time, .
return nil, nil
}

func (d *emptyDistributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
func (d *emptyDistributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
return nil, nil
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/series/series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ func MatrixToSeriesSet(m model.Matrix) storage.SeriesSet {
return NewConcreteSeriesSet(series)
}

// MetricsToSeriesSet creates a storage.SeriesSet from a []metric.Metric
func MetricsToSeriesSet(ms []model.Metric) storage.SeriesSet {
series := make([]storage.Series, 0, len(ms))
for _, m := range ms {
// LabelsToSeriesSet creates a storage.SeriesSet from a []labels.Labels
func LabelsToSeriesSet(ls []labels.Labels) storage.SeriesSet {
series := make([]storage.Series, 0, len(ls))
for _, l := range ls {
series = append(series, &ConcreteSeries{
labels: metricToLabels(m),
labels: l,
samples: nil,
})
}
Expand Down

0 comments on commit 62148b2

Please sign in to comment.