Skip to content

Commit

Permalink
Support active count_method in label names cardinality (#7085)
Browse files Browse the repository at this point in the history
* Support active count_method in label names cardinality

In #5136, we added the
`count_method` param to the label values cardinality API. This PR does
the same for the label names cardinality API.

* Fix failing test

* Fix more failing tests

* Fix more failing tests

* Fix the last failing test

* Add test for new functionality

* Add test for filter and make it actually work

* Update docs

* Update changelog

* Add license header

* Apply PR feedback
  • Loading branch information
Logiraptor authored Jan 12, 2024
1 parent 18c9df3 commit 7ce863f
Show file tree
Hide file tree
Showing 19 changed files with 393 additions and 159 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
* [CHANGE] Server: responses with HTTP 4xx status codes are now treated as errors and used in `status_code` label of request duration metric. #7045
* [CHANGE] Memberlist: change default for `-memberlist.stream-timeout` from `10s` to `2s`. #7076
* [CHANGE] Memcached: remove legacy `thanos_cache_memcached_*` and `thanos_memcached_*` prefixed metrics. Instead, Memcached and Redis cache clients now emit `thanos_cache_*` prefixed metrics with a `backend` label. #7076
* [FEATURE] Introduce `-tenant-federation.max-tenants` option to limit the max number of tenants allowed for requests when federation is enabled. #6959
* [FEATURE] Cardinality API: added a new `count_method` parameter which enables counting active label values. #7085
* [ENHANCEMENT] Store-gateway: add no-compact details column on store-gateway tenants admin UI. #6848
* [ENHANCEMENT] PromQL: ignore small errors for bucketQuantile #6766
* [ENHANCEMENT] Distributor: improve efficiency of some errors #6785
Expand All @@ -19,7 +21,6 @@
* [ENHANCEMENT] Distributor: `-distributor.remote-timeout` is now accounted from the first ingester push request being sent. #6972
* [ENHANCEMENT] Storage Provider: allow aws sts support for s3 storage provider #6172
* [ENHANCEMENT] Querier: add `cortex_querier_queries_storage_type_total ` metric that indicates how many queries have executed for a source, ingesters or store-gateways. Add `cortex_query_storegateway_chunks_total` metric to count the number of chunks fetched from a store gateway. #7099
* [FEATURE] Introduce `-tenant-federation.max-tenants` option to limit the max number of tenants allowed for requests when federation is enabled. #6959
* [ENHANCEMENT] Query-frontend: add experimental support for sharding active series queries via `-query-frontend.shard-active-series-queries`. #6784
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451
* [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766
Expand Down
18 changes: 15 additions & 3 deletions docs/sources/mimir/references/http-api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -554,23 +554,35 @@ GET,POST <prometheus-http-prefix>/api/v1/cardinality/label_names
Returns label names cardinality across all ingesters, for the authenticated tenant, in `JSON` format.
It counts distinct label values per label name.

As far as this endpoint generates cardinality report using only values from currently opened TSDBs in ingesters, two subsequent calls may return completely different results, if ingester did a block
cutting between the calls.

The items in the field `cardinality` are sorted by `label_values_count` in DESC order and by `label_name` in ASC order.
The count of items is limited by `limit` request param.

This endpoint is disabled by default and can be enabled via the `-querier.cardinality-analysis-enabled` CLI flag (or its respective YAML config option).

Requires [authentication](#authentication).

#### Count series by `inmemory` or `active`

Two methods of counting are available: `inmemory` and `active`. To choose one, use the `count_method` parameter.

The `inmemory` method counts the labels in currently opened TSDBs in Mimir's ingesters.
Two subsequent calls might return completely different results if an ingester cut a block between calls.
This method of counting is most useful for understanding ingester memory usage.

The `active` method also counts labels in currently opened TSDBs in Mimir's ingesters, but filters out values that have not received a sample within a configurable duration of time.
To configure this duration, use the `-ingester.active-series-metrics-idle-timeout` parameter.
This method of counting is most useful for understanding what label values are represented in the samples ingested by Mimir in the last `-ingester.active-series-metrics-idle-timeout`.
Two subsequent calls will likely return similar results, because this window of time is not related to the block cutting on ingesters.
Values will change only as a result of changes in the data ingested by Mimir.

#### Caching

The query-frontend can return a stale response fetched from the query results cache if `-query-frontend.cache-results` is enabled and `-query-frontend.results-cache-ttl-for-cardinality-query` set to a value greater than `0`.

#### Request params

- **selector** - _optional_ - specifies PromQL selector that will be used to filter series that must be analyzed.
- **count_method** - _optional_ - specifies which series counting method will be used. (default="inmemory", available options=["inmemory", "active"])
- **limit** - _optional_ - specifies max count of items in field `cardinality` in response (default=20, min=0, max=500)

#### Response schema
Expand Down
14 changes: 12 additions & 2 deletions pkg/cardinality/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ const (
)

type LabelNamesRequest struct {
Matchers []*labels.Matcher
Limit int
Matchers []*labels.Matcher
CountMethod CountMethod
Limit int
}

// Strings returns a full representation of the request. The returned string can be
Expand All @@ -51,6 +52,10 @@ func (r *LabelNamesRequest) String() string {
b.WriteString(matcher.String())
}

// Add count method.
b.WriteRune(stringParamSeparator)
b.WriteString(string(r.CountMethod))

// Add limit.
b.WriteRune(stringParamSeparator)
b.WriteString(strconv.Itoa(r.Limit))
Expand Down Expand Up @@ -85,6 +90,11 @@ func DecodeLabelNamesRequestFromValues(values url.Values) (*LabelNamesRequest, e
return nil, err
}

parsed.CountMethod, err = extractCountMethod(values)
if err != nil {
return nil, err
}

return parsed, nil
}

Expand Down
13 changes: 8 additions & 5 deletions pkg/cardinality/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ import (
func TestDecodeLabelNamesRequest(t *testing.T) {
var (
params = url.Values{
"selector": []string{`{second!="2",first="1"}`},
"limit": []string{"100"},
"selector": []string{`{second!="2",first="1"}`},
"count_method": []string{"active"},
"limit": []string{"100"},
}

expected = &LabelNamesRequest{
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "first", "1"),
labels.MustNewMatcher(labels.MatchNotEqual, "second", "2"),
},
Limit: 100,
CountMethod: ActiveMethod,
Limit: 100,
}
)

Expand Down Expand Up @@ -65,10 +67,11 @@ func TestLabelNamesRequest_String(t *testing.T) {
labels.MustNewMatcher(labels.MatchEqual, "first", "1"),
labels.MustNewMatcher(labels.MatchNotEqual, "second", "2"),
},
Limit: 100,
CountMethod: ActiveMethod,
Limit: 100,
}

assert.Equal(t, "first=\"1\"\x01second!=\"2\"\x00100", req.String())
assert.Equal(t, "first=\"1\"\x01second!=\"2\"\x00active\x00100", req.String())
}

func TestDecodeLabelValuesRequest(t *testing.T) {
Expand Down
15 changes: 11 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1509,13 +1509,13 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
}

// LabelNamesAndValues query ingesters for label names and values and returns labels with distinct list of values.
func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher) (*ingester_client.LabelNamesAndValuesResponse, error) {
func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelNamesAndValuesResponse, error) {
replicationSet, err := d.GetIngesters(ctx)
if err != nil {
return nil, err
}

req, err := toLabelNamesCardinalityRequest(matchers)
req, err := toLabelNamesCardinalityRequest(matchers, countMethod)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1546,12 +1546,19 @@ type labelNamesAndValuesResponseMerger struct {
currentSizeBytes int
}

func toLabelNamesCardinalityRequest(matchers []*labels.Matcher) (*ingester_client.LabelNamesAndValuesRequest, error) {
func toLabelNamesCardinalityRequest(matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelNamesAndValuesRequest, error) {
matchersProto, err := ingester_client.ToLabelMatchers(matchers)
if err != nil {
return nil, err
}
return &ingester_client.LabelNamesAndValuesRequest{Matchers: matchersProto}, nil
ingesterCountMethod, err := toIngesterCountMethod(countMethod)
if err != nil {
return nil, err
}
return &ingester_client.LabelNamesAndValuesRequest{
Matchers: matchersProto,
CountMethod: ingesterCountMethod,
}, nil
}

// toLabelNamesAndValuesResponses converts map with distinct label values to `ingester_client.LabelNamesAndValuesResponse`.
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2595,7 +2595,7 @@ func TestDistributor_LabelNamesAndValuesLimitTest(t *testing.T) {
require.NoError(t, err)
}

_, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{})
_, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{}, cardinality.InMemoryMethod)
if len(testData.expectedError) == 0 {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -2728,7 +2728,7 @@ func TestDistributor_LabelNamesAndValues(t *testing.T) {

// Assert on metric metadata
timeBeforeExecution := time.Now()
response, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{})
response, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{}, cardinality.InMemoryMethod)
require.NoError(t, err)
if len(testData.zonesResponseDelay) > 0 {
executionDuration := time.Since(timeBeforeExecution)
Expand Down Expand Up @@ -2762,7 +2762,7 @@ func TestDistributor_LabelValuesCardinality_ExpectedAllIngestersResponsesToBeCom
// Also, it simulates delay from zone C to verify that there is no race condition. must be run with `-race` flag (race detection).
func TestDistributor_LabelNamesAndValues_ExpectedAllPossibleLabelNamesAndValuesToBeReturned(t *testing.T) {
ctx, ds := prepareWithZoneAwarenessAndZoneDelay(t, createSeries(10000))
response, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{})
response, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{}, cardinality.InMemoryMethod)
require.NoError(t, err)
require.Len(t, response.Items, 1)
require.Equal(t, 10000, len(response.Items[0].Values))
Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/querymiddleware/cardinality_query_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestCardinalityQueryCache_RoundTrip_WithTenantFederation(t *testing.T) {

// Create the request.
reqURL := mustParseURL(t, `/prometheus/api/v1/cardinality/label_names?selector={job="test"}&limit=100`)
reqCacheKey := tenant.JoinTenantIDs(testData.tenantIDs) + ":job=\"test\"\x00100"
reqCacheKey := tenant.JoinTenantIDs(testData.tenantIDs) + ":job=\"test\"\x00inmemory\x00100"
reqHashedCacheKey := cardinalityLabelNamesQueryCachePrefix + cacheHashKey(reqCacheKey)

req := &http.Request{URL: reqURL}
Expand Down Expand Up @@ -102,9 +102,9 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
testGenericQueryCacheRoundTrip(t, newCardinalityQueryCacheRoundTripper, "cardinality", map[string]testGenericQueryCacheRequestType{
"label names request": {
reqPath: "/prometheus/api/v1/cardinality/label_names",
reqData: url.Values{"selector": []string{`{job="test"}`}, "limit": []string{"100"}},
cacheKey: "user-1:job=\"test\"\x00100",
hashedCacheKey: cardinalityLabelNamesQueryCachePrefix + cacheHashKey("user-1:job=\"test\"\x00100"),
reqData: url.Values{"selector": []string{`{job="test"}`}, "limit": []string{"100"}, "count_method": []string{"active"}},
cacheKey: "user-1:job=\"test\"\x00active\x00100",
hashedCacheKey: cardinalityLabelNamesQueryCachePrefix + cacheHashKey("user-1:job=\"test\"\x00active\x00100"),
},
"label values request": {
reqPath: "/prometheus/api/v1/cardinality/label_values",
Expand Down
23 changes: 23 additions & 0 deletions pkg/ingester/activeseries/active_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// SPDX-License-Identifier: AGPL-3.0-only

package activeseries

import (
"context"

"github.com/prometheus/prometheus/tsdb/index"
)

type PostingsReader interface {
Postings(ctx context.Context, name string, values ...string) (index.Postings, error)
}

func IsLabelValueActive(ctx context.Context, reader PostingsReader, activeSeries *ActiveSeries, name, value string) (bool, error) {
valuePostings, err := reader.Postings(ctx, name, value)
if err != nil {
return false, err
}

activePostings := NewPostings(activeSeries, valuePostings)
return activePostings.Next(), nil
}
77 changes: 77 additions & 0 deletions pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-License-Identifier: AGPL-3.0-only

package activeseries

import (
"context"
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"
)

type mockPostingsReader struct {
postings *index.MemPostings
}

func (m *mockPostingsReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
valuePostings := make([]index.Postings, 0, len(values))

for _, value := range values {
valuePostings = append(valuePostings, m.postings.Get(name, value))
}

return index.Merge(ctx, valuePostings...), nil
}

func TestIsLabelValueActive(t *testing.T) {
ctx := context.Background()
ttl := 3
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"),
labels.FromStrings("a", "4"),
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

memPostings := index.NewMemPostings()
for i, l := range series {
memPostings.Add(allStorageRefs[i], l)
}
reader := &mockPostingsReader{postings: memPostings}

// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1)
}

valid := activeSeries.Purge(mockedTime)
require.True(t, valid)

result, err := IsLabelValueActive(ctx, reader, activeSeries, "a", "1")
require.NoError(t, err)
require.False(t, result)

result, err = IsLabelValueActive(ctx, reader, activeSeries, "a", "2")
require.NoError(t, err)
require.False(t, result)

result, err = IsLabelValueActive(ctx, reader, activeSeries, "a", "3")
require.NoError(t, err)
require.False(t, result)

result, err = IsLabelValueActive(ctx, reader, activeSeries, "a", "4")
require.NoError(t, err)
require.True(t, result)

result, err = IsLabelValueActive(ctx, reader, activeSeries, "a", "5")
require.NoError(t, err)
require.True(t, result)
}
Loading

0 comments on commit 7ce863f

Please sign in to comment.