diff --git a/CHANGELOG.md b/CHANGELOG.md index 655005269f..71dc74b244 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Accepted into CNCF: request for both Prometheus and sidecar. Single requests now should take constant amount of memory on sidecar, so resource consumption prediction is now straightforward. This will be used if you have Prometheus `2.13` or `2.12-master`. - [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type - [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings +- [#1362](https://github.com/thanos-io/thanos/pull/1362) Optional `replicaLabels` param for `/query` and `/query_range` querier endpoints. When provided overwrite the `query.replica-label` cli flags. - [#1395](https://github.com/thanos-io/thanos/pull/1395) Thanos Sidecar added `/-/ready` and `/-/healthy` endpoints to Thanos sidecar. - [#1297](https://github.com/thanos-io/thanos/pull/1297) Thanos Compact added `/-/ready` and `/-/healthy` endpoints to Thanos compact. - [#1431](https://github.com/thanos-io/thanos/pull/1431) Thanos Query added hidden flag to allow the use of downsampled resolution data for instant queries. @@ -33,6 +34,7 @@ Accepted into CNCF: ### Changed +- [#1362](https://github.com/thanos-io/thanos/pull/1362) `query.replica-label` configuration can be provided more than once for multiple deduplication labels like: `--query.replica-label=prometheus_replica --query.replica-label=service`. - [#1414](https://github.com/thanos-io/thanos/pull/1413) Upgraded important dependencies: Prometheus to 2.12-rc.0. TSDB is now part of Prometheus. - [#1380](https://github.com/thanos-io/thanos/pull/1380) Upgraded important dependencies: Prometheus to 2.11.1 and TSDB to 0.9.1. Some changes affecting Querier: - [ENHANCEMENT] Query performance improvement: Efficient iteration and search in HashForLabels and HashWithoutLabels. #5707 diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 0eb2ac0399..b1b75dd5ae 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -63,8 +63,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node."). Default("20").Int() - replicaLabel := cmd.Flag("query.replica-label", "Label to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter."). - String() + replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter."). + Strings() instantDefaultMaxSourceResolution := modelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden()) @@ -146,7 +146,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string *maxConcurrentQueries, time.Duration(*queryTimeout), time.Duration(*storeResponseTimeout), - *replicaLabel, + *replicaLabels, selectorLset, *stores, *enableAutodownsampling, @@ -264,7 +264,7 @@ func runQuery( maxConcurrentQueries int, queryTimeout time.Duration, storeResponseTimeout time.Duration, - replicaLabel string, + replicaLabels []string, selectorLset labels.Labels, storeAddrs []string, enableAutodownsampling bool, @@ -312,7 +312,7 @@ func runQuery( unhealthyStoreTimeout, ) proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout) - queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel) + queryableCreator = query.NewQueryableCreator(logger, proxy) engine = promql.NewEngine( promql.EngineOpts{ Logger: logger, @@ -404,7 +404,7 @@ func runQuery( ins := extpromhttp.NewInstrumentationMiddleware(reg) ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins) - api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, instantDefaultMaxSourceResolution) + api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, replicaLabels, instantDefaultMaxSourceResolution) api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) diff --git a/docs/components/query.md b/docs/components/query.md index d1e98cecbc..1bbb96e23b 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -23,10 +23,12 @@ $ thanos query \ ## Deduplication The query layer can deduplicate series that were collected from high-availability pairs of data sources such as Prometheus. -A fixed replica label must be chosen for the entire cluster and can then be passed to query nodes on startup. +A fixed single or multiple replica labels must be chosen for the entire cluster and can then be passed to query nodes on startup. Two or more series that are only distinguished by the given replica label, will be merged into a single time series. -This also hides gaps in collection of a single data source. For example: +This also hides gaps in collection of a single data source. + +### An example with a single replica labels: * Prometheus + sidecar "A": `cluster=1,env=2,replica=A` * Prometheus + sidecar "B": `cluster=1,env=2,replica=B` @@ -47,12 +49,28 @@ And we query for metric `up{job="prometheus",env="2"}` with this option we will * `up{job="prometheus",env="2",cluster="1"} 1` * `up{job="prometheus",env="2",cluster="2"} 1` -WITHOUT this replica flag (so deduplication turned off), we will get 3 results: +WITHOUT this replica flag (deduplication turned off), we will get 3 results: * `up{job="prometheus",env="2",cluster="1",replica="A"} 1` * `up{job="prometheus",env="2",cluster="1",replica="B"} 1` * `up{job="prometheus",env="2",cluster="2",replica="A"} 1` +### The same output will be present for this example with multiple replica labels: + +* Prometheus + sidecar "A": `cluster=1,env=2,replica=A,replicaX=A` +* Prometheus + sidecar "B": `cluster=1,env=2,replica=B,replicaX=B` +* Prometheus + sidecar "A" in different cluster: `cluster=2,env=2,replica=A,replicaX=A` + +``` +$ thanos query \ + --http-address "0.0.0.0:9090" \ + --query.replica-label "replica" \ + --query.replica-label "replicaX" \ + --store ":" \ + --store ":" \ +``` + + This logic can also be controlled via parameter on QueryAPI. More details below. ## Query API @@ -89,6 +107,15 @@ Querier also allows to configure different timeouts: If you prefer availability over accuracy you can set tighter timeout to underlying StoreAPI than overall query timeout. If partial response strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warning with 200 status code response. +### Deduplication replica labels. + +| HTTP URL/FORM parameter | Type | Default | Example | +|----|----|----|----| +| `replicaLabels` | `[]string` | `query.replica-label` flag (default: empty). | `replicaLabels=replicaA&replicaLabels=replicaB` | +| | | | | + +This overwrites the `query.replica-label` cli flag to allow dynamic replica labels at query time. + ### Deduplication Enabled | HTTP URL/FORM parameter | Type | Default | Example | @@ -96,7 +123,7 @@ strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warn | `dedup` | `Boolean` | True, but effect depends on `query.replica` configuration flag. | `1, t, T, TRUE, true, True` for "True" | | | | | | -This controls if query should use `replica` label for deduplication or not. +This controls if query results should be deduplicated using the replica labels. ### Auto downsampling @@ -236,8 +263,8 @@ Flags: --query.timeout=2m Maximum time to process query by query node. --query.max-concurrent=20 Maximum number of queries processed concurrently by query node. - --query.replica-label=QUERY.REPLICA-LABEL - Label to treat as a replica indicator along + --query.replica-label=QUERY.REPLICA-LABEL ... + Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. diff --git a/docs/getting-started.md b/docs/getting-started.md index 3ef426cc02..29665e5212 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -197,6 +197,7 @@ thanos query \ --store 1.2.3.4:19090 \ --store 1.2.3.5:19090 \ --query.replica-label replica # Replica label for de-duplication + --query.replica-label replicaX # Supports multiple replica labels for de-duplication ``` Go to the configured HTTP address, and you should now be able to query across all Prometheus instances and receive de-duplicated data. diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index ef595b5a73..c2e34220a1 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -102,6 +102,7 @@ type API struct { enableAutodownsampling bool enablePartialResponse bool + replicaLabels []string reg prometheus.Registerer defaultInstantQueryMaxSourceResolution time.Duration @@ -116,6 +117,7 @@ func NewAPI( c query.QueryableCreator, enableAutodownsampling bool, enablePartialResponse bool, + replicaLabels []string, defaultInstantQueryMaxSourceResolution time.Duration, ) *API { return &API{ @@ -124,6 +126,7 @@ func NewAPI( queryableCreate: c, enableAutodownsampling: enableAutodownsampling, enablePartialResponse: enablePartialResponse, + replicaLabels: replicaLabels, reg: reg, defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution, @@ -185,6 +188,21 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool return enableDeduplication, nil } +func (api *API) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *ApiError) { + const replicaLabelsParam = "replicaLabels[]" + if err := r.ParseForm(); err != nil { + return nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")} + } + + replicaLabels = api.replicaLabels + // Overwrite the cli flag when provided as a query parameter. + if len(r.Form[replicaLabelsParam]) > 0 { + replicaLabels = r.Form[replicaLabelsParam] + } + + return replicaLabels, nil +} + func (api *API) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *ApiError) { const maxSourceResolutionParam = "max_source_resolution" maxSourceResolution := 0 * time.Second @@ -212,6 +230,7 @@ func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialRespons const partialResponseParam = "partial_response" enablePartialResponse = api.enablePartialResponse + // Overwrite the cli flag when provided as a query parameter. if val := r.FormValue(partialResponseParam); val != "" { var err error enablePartialResponse, err = strconv.ParseBool(val) @@ -255,6 +274,11 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } + replicaLabels, apiErr := api.parseReplicaLabelsParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + enablePartialResponse, apiErr := api.parsePartialResponseParam(r) if apiErr != nil { return nil, nil, apiErr @@ -269,7 +293,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts) + qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts) if err != nil { return nil, nil, &ApiError{errorBadData, err} } @@ -341,6 +365,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } + replicaLabels, apiErr := api.parseReplicaLabelsParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + // If no max_source_resolution is specified fit at least 5 samples between steps. maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step/5) if apiErr != nil { @@ -357,7 +386,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { defer span.Finish() qry, err := api.queryEngine.NewRangeQuery( - api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse), + api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), start, end, @@ -397,7 +426,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } - q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } @@ -463,13 +492,18 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } + replicaLabels, apiErr := api.parseReplicaLabelsParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + enablePartialResponse, apiErr := api.parsePartialResponseParam(r) if apiErr != nil { return nil, nil, apiErr } // TODO(bwplotka): Support downsampling? - q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &ApiError{errorExec, err} } @@ -572,7 +606,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } - q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 69a26dce02..6c5654b535 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -29,47 +29,83 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage" + tsdb_labels "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/component" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/testutil" ) -func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator { - return func(_ bool, _ int64, _ bool) storage.Queryable { - return queryable - } -} - func TestEndpoints(t *testing.T) { - suite, err := promql.NewTest(t, ` - load 1m - test_metric1{foo="bar"} 0+100x100 - test_metric1{foo="boo"} 1+0x100 - test_metric2{foo="boo"} 1+0x100 - `) - if err != nil { - t.Fatal(err) + defer leaktest.CheckTimeout(t, 10*time.Second)() + + lbls := []tsdb_labels.Labels{ + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric1"}, + tsdb_labels.Label{Name: "foo", Value: "bar"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric2"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "bar"}, + tsdb_labels.Label{Name: "replica", Value: "a"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + tsdb_labels.Label{Name: "replica", Value: "a"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + tsdb_labels.Label{Name: "replica", Value: "b"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + tsdb_labels.Label{Name: "replica1", Value: "a"}, + }, } - defer suite.Close() - if err := suite.Run(); err != nil { - t.Fatal(err) + db, err := testutil.NewTSDB() + defer func() { testutil.Ok(t, db.Close()) }() + testutil.Ok(t, err) + + app := db.Appender() + for _, lbl := range lbls { + for i := int64(0); i < 10; i++ { + _, err := app.Add(lbl, i*60000, float64(i)) + testutil.Ok(t, err) + } } + testutil.Ok(t, app.Commit()) now := time.Now() - api := &API{ - queryableCreate: testQueryableCreator(suite.Storage()), - queryEngine: suite.QueryEngine(), - + queryableCreate: query.NewQueryableCreator(nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)), + queryEngine: promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 20, + MaxSamples: 10000, + Timeout: 100 * time.Second, + }), now: func() time.Time { return now }, } @@ -125,6 +161,211 @@ func TestEndpoints(t *testing.T) { }, }, }, + // Query endpoint without deduplication. + { + endpoint: api.query, + query: url.Values{ + "query": []string{"test_metric_replica1"}, + "time": []string{"1970-01-01T01:02:03+01:00"}, + }, + response: &queryData{ + ResultType: promql.ValueTypeVector, + Result: promql.Vector{ + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "bar", + }, + { + Name: "replica", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica", + Value: "b", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica1", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + }, + }, + }, + // Query endpoint with single deduplication label. + { + endpoint: api.query, + query: url.Values{ + "query": []string{"test_metric_replica1"}, + "time": []string{"1970-01-01T01:02:03+01:00"}, + "replicaLabels[]": []string{"replica"}, + }, + response: &queryData{ + ResultType: promql.ValueTypeVector, + Result: promql.Vector{ + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica1", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + }, + }, + }, + // Query endpoint with multiple deduplication label. + { + endpoint: api.query, + query: url.Values{ + "query": []string{"test_metric_replica1"}, + "time": []string{"1970-01-01T01:02:03+01:00"}, + "replicaLabels[]": []string{"replica", "replica1"}, + }, + response: &queryData{ + ResultType: promql.ValueTypeVector, + Result: promql.Vector{ + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + }, + }, + }, { endpoint: api.query, query: url.Values{ @@ -269,6 +510,7 @@ func TestEndpoints(t *testing.T) { response: []string{ "test_metric1", "test_metric2", + "test_metric_replica1", }, }, { diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 589da98e10..65d0c1db0f 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -327,8 +327,8 @@ func (it *chunkSeriesIterator) Err() error { } type dedupSeriesSet struct { - set storage.SeriesSet - replicaLabel string + set storage.SeriesSet + replicaLabels map[string]struct{} replicas []storage.Series lset labels.Labels @@ -336,8 +336,8 @@ type dedupSeriesSet struct { ok bool } -func newDedupSeriesSet(set storage.SeriesSet, replicaLabel string) storage.SeriesSet { - s := &dedupSeriesSet{set: set, replicaLabel: replicaLabel} +func newDedupSeriesSet(set storage.SeriesSet, replicaLabels map[string]struct{}) storage.SeriesSet { + s := &dedupSeriesSet{set: set, replicaLabels: replicaLabels} s.ok = s.set.Next() if s.ok { s.peek = s.set.At() @@ -360,10 +360,18 @@ func (s *dedupSeriesSet) Next() bool { // replica label if it exists func (s *dedupSeriesSet) peekLset() labels.Labels { lset := s.peek.Labels() - if lset[len(lset)-1].Name != s.replicaLabel { + if len(s.replicaLabels) == 0 { return lset } - return lset[:len(lset)-1] + // Check how many replica labels are present so that these are removed. + var totalToRemove int + for index := 0; index < len(s.replicaLabels); index++ { + if _, ok := s.replicaLabels[lset[len(lset)-index-1].Name]; ok { + totalToRemove++ + } + } + // Strip all present replica labels. + return lset[:len(lset)-totalToRemove] } func (s *dedupSeriesSet) next() bool { diff --git a/pkg/query/querier.go b/pkg/query/querier.go index aefe9a2ebe..b6036e5ad0 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -14,17 +14,19 @@ import ( ) // QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. -// If deduplication is enabled, all data retrieved from it will be deduplicated along the replicaLabel by default. +// If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default. +// When the replicaLabels argument is not empty it overwrites the global replicaLabels flag. This allows specifying +// replicaLabels at query time. // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy. -type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool) storage.Queryable +type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable // NewQueryableCreator creates QueryableCreator. -func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabel string) QueryableCreator { - return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool) storage.Queryable { +func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator { + return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable { return &queryable{ logger: logger, - replicaLabel: replicaLabel, + replicaLabels: replicaLabels, proxy: proxy, deduplicate: deduplicate, maxResolutionMillis: maxResolutionMillis, @@ -35,7 +37,7 @@ func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLa type queryable struct { logger log.Logger - replicaLabel string + replicaLabels []string proxy storepb.StoreServer deduplicate bool maxResolutionMillis int64 @@ -44,7 +46,7 @@ type queryable struct { // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse), nil } type querier struct { @@ -52,7 +54,7 @@ type querier struct { logger log.Logger cancel func() mint, maxt int64 - replicaLabel string + replicaLabels map[string]struct{} proxy storepb.StoreServer deduplicate bool maxResolutionMillis int64 @@ -65,7 +67,7 @@ func newQuerier( ctx context.Context, logger log.Logger, mint, maxt int64, - replicaLabel string, + replicaLabels []string, proxy storepb.StoreServer, deduplicate bool, maxResolutionMillis int64, @@ -75,13 +77,18 @@ func newQuerier( logger = log.NewNopLogger() } ctx, cancel := context.WithCancel(ctx) + + rl := make(map[string]struct{}) + for _, replicaLabel := range replicaLabels { + rl[replicaLabel] = struct{}{} + } return &querier{ ctx: ctx, logger: logger, cancel: cancel, mint: mint, maxt: maxt, - replicaLabel: replicaLabel, + replicaLabels: rl, proxy: proxy, deduplicate: deduplicate, maxResolutionMillis: maxResolutionMillis, @@ -90,7 +97,7 @@ func newQuerier( } func (q *querier) isDedupEnabled() bool { - return q.deduplicate && q.replicaLabel != "" + return q.deduplicate && len(q.replicaLabels) > 0 } type seriesServer struct { @@ -193,7 +200,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // TODO(fabxc): this could potentially pushed further down into the store API // to make true streaming possible. - sortDedupLabels(resp.seriesSet, q.replicaLabel) + sortDedupLabels(resp.seriesSet, q.replicaLabels) set := &promSeriesSet{ mint: q.mint, @@ -205,19 +212,19 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // The merged series set assembles all potentially-overlapping time ranges // of the same series into a single one. The series are ordered so that equal series // from different replicas are sequential. We can now deduplicate those. - return newDedupSeriesSet(set, q.replicaLabel), warns, nil + return newDedupSeriesSet(set, q.replicaLabels), warns, nil } -// sortDedupLabels resorts the set so that the same series with different replica +// sortDedupLabels re-sorts the set so that the same series with different replica // labels are coming right after each other. -func sortDedupLabels(set []storepb.Series, replicaLabel string) { +func sortDedupLabels(set []storepb.Series, replicaLabels map[string]struct{}) { for _, s := range set { - // Move the replica label to the very end. + // Move the replica labels to the very end. sort.Slice(s.Labels, func(i, j int) bool { - if s.Labels[i].Name == replicaLabel { + if _, ok := replicaLabels[s.Labels[i].Name]; ok { return false } - if s.Labels[j].Name == replicaLabel { + if _, ok := replicaLabels[s.Labels[j].Name]; ok { return true } return s.Labels[i].Name < s.Labels[j].Name diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index a5a280f83c..00d416e88c 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -25,10 +25,10 @@ import ( func TestQueryableCreator_MaxResolution(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() testProxy := &storeServer{resps: []*storepb.SeriesResponse{}} - queryableCreator := NewQueryableCreator(nil, testProxy, "test") + queryableCreator := NewQueryableCreator(nil, testProxy) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) - queryable := queryableCreator(false, oneHourMillis, false) + queryable := queryableCreator(false, nil, oneHourMillis, false) q, err := queryable.Querier(context.Background(), 0, 42) testutil.Ok(t, err) @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) { }, } - q := NewQueryableCreator(nil, testProxy, "")(false, 9999999, false) + q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false) engine := promql.NewEngine( promql.EngineOpts{ @@ -176,7 +176,7 @@ func TestQuerier_Series(t *testing.T) { // Querier clamps the range to [1,300], which should drop some samples of the result above. // The store API allows endpoints to send more data then initially requested. - q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true) + q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true) defer func() { testutil.Ok(t, q.Close()) }() res, _, err := q.Select(&storage.SelectParams{}) @@ -220,56 +220,139 @@ func TestQuerier_Series(t *testing.T) { func TestSortReplicaLabel(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() - set := []storepb.Series{ - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "b", Value: "replica-1"}, - {Name: "c", Value: "3"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "b", Value: "replica-1"}, - {Name: "c", Value: "3"}, - {Name: "d", Value: "4"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "b", Value: "replica-1"}, - {Name: "c", Value: "4"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "b", Value: "replica-2"}, - {Name: "c", Value: "3"}, - }}, + tests := []struct { + input []storepb.Series + exp []storepb.Series + dedupLabels map[string]struct{} + }{ + // 0 Single deduplication label. + { + input: []storepb.Series{ + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "c", Value: "3"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "c", Value: "3"}, + {Name: "d", Value: "4"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "c", Value: "4"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-2"}, + {Name: "c", Value: "3"}, + }}, + }, + exp: []storepb.Series{ + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-1"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-2"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "d", Value: "4"}, + {Name: "b", Value: "replica-1"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "4"}, + {Name: "b", Value: "replica-1"}, + }}, + }, + dedupLabels: map[string]struct{}{"b": struct{}{}}, + }, + // 1 Multi deduplication labels. + { + input: []storepb.Series{ + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + {Name: "c", Value: "3"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + {Name: "c", Value: "3"}, + {Name: "d", Value: "4"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + {Name: "c", Value: "4"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-2"}, + {Name: "b1", Value: "replica-2"}, + {Name: "c", Value: "3"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-2"}, + {Name: "c", Value: "3"}, + }}, + }, + exp: []storepb.Series{ + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-2"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-2"}, + {Name: "b1", Value: "replica-2"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "d", Value: "4"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "4"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + }}, + }, + dedupLabels: map[string]struct{}{ + "b": struct{}{}, + "b1": struct{}{}, + }, + }, } - - sortDedupLabels(set, "b") - - exp := []storepb.Series{ - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "c", Value: "3"}, - {Name: "b", Value: "replica-1"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "c", Value: "3"}, - {Name: "b", Value: "replica-2"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "c", Value: "3"}, - {Name: "d", Value: "4"}, - {Name: "b", Value: "replica-1"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "c", Value: "4"}, - {Name: "b", Value: "replica-1"}, - }}, + for _, test := range tests { + t.Run("", func(t *testing.T) { + sortDedupLabels(test.input, test.dedupLabels) + testutil.Equals(t, test.exp, test.input) + }) } - testutil.Equals(t, exp, set) } func expandSeries(t testing.TB, it storage.SeriesIterator) (res []sample) { @@ -284,91 +367,200 @@ func expandSeries(t testing.TB, it storage.SeriesIterator) (res []sample) { func TestDedupSeriesSet(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() - input := []struct { - lset []storepb.Label - vals []sample - }{ - { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-2"}}, - vals: []sample{{60000, 3}, {70000, 4}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, - vals: []sample{{200000, 5}, {210000, 6}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}, {Name: "replica", Value: "replica-1"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, - vals: []sample{{60000, 3}, {70000, 4}}, - }, - } - exp := []struct { - lset labels.Labels - vals []sample + tests := []struct { + input []struct { + lset []storepb.Label + vals []sample + } + exp []struct { + lset labels.Labels + vals []sample + } + dedupLabels map[string]struct{} }{ - { - lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, - vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}, {200000, 5}, {210000, 6}}, - }, - { - lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, - { - lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, - { - lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}}, - vals: []sample{{10000, 1}, {20000, 2}}, + { // 0 Single dedup label. + input: []struct { + lset []storepb.Label + vals []sample + }{ + { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-2"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, + vals: []sample{{200000, 5}, {210000, 6}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}, {Name: "replica", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, + }, + exp: []struct { + lset labels.Labels + vals []sample + }{ + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}, {200000, 5}, {210000, 6}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}}, + }, + }, + dedupLabels: map[string]struct{}{ + "replica": struct{}{}, + }, }, - { - lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}}, - vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}}, + { // 1 Multi dedup label. + input: []struct { + lset []storepb.Label + vals []sample + }{ + { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-2"}, {Name: "replicaA", Value: "replica-2"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}, {Name: "replicaA", Value: "replica-3"}}, + vals: []sample{{200000, 5}, {210000, 6}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}, {Name: "replicaA", Value: "replica-3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}, {Name: "replicaA", Value: "replica-3"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, + }, + exp: []struct { + lset labels.Labels + vals []sample + }{ + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}, {200000, 5}, {210000, 6}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}}, + }, + }, + dedupLabels: map[string]struct{}{ + "replica": struct{}{}, + "replicaA": struct{}{}, + }, }, - } - var series []storepb.Series - for _, c := range input { - chk := chunkenc.NewXORChunk() - app, _ := chk.Appender() - for _, s := range c.vals { - app.Append(s.t, s.v) - } - series = append(series, storepb.Series{ - Labels: c.lset, - Chunks: []storepb.AggrChunk{ - {Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chk.Bytes()}}, + { // 2 Multi dedup label - some series don't have all dedup labels. + input: []struct { + lset []storepb.Label + vals []sample + }{ + { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-2"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, }, - }) - } - set := &promSeriesSet{ - mint: 1, - maxt: math.MaxInt64, - set: newStoreSeriesSet(series), + exp: []struct { + lset labels.Labels + vals []sample + }{ + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}}, + }, + }, + dedupLabels: map[string]struct{}{ + "replica": struct{}{}, + "replicaA": struct{}{}, + }, + }, } - dedupSet := newDedupSeriesSet(set, "replica") - i := 0 - for dedupSet.Next() { - testutil.Equals(t, exp[i].lset, dedupSet.At().Labels()) - - res := expandSeries(t, dedupSet.At().Iterator()) - testutil.Equals(t, exp[i].vals, res) - i++ + for _, test := range tests { + t.Run("", func(t *testing.T) { + var series []storepb.Series + for _, c := range test.input { + chk := chunkenc.NewXORChunk() + app, _ := chk.Appender() + for _, s := range c.vals { + app.Append(s.t, s.v) + } + series = append(series, storepb.Series{ + Labels: c.lset, + Chunks: []storepb.AggrChunk{ + {Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chk.Bytes()}}, + }, + }) + } + set := &promSeriesSet{ + mint: 1, + maxt: math.MaxInt64, + set: newStoreSeriesSet(series), + } + dedupSet := newDedupSeriesSet(set, test.dedupLabels) + + i := 0 + for dedupSet.Next() { + testutil.Equals(t, test.exp[i].lset, dedupSet.At().Labels(), "labels mismatch at index:%v", i) + res := expandSeries(t, dedupSet.At().Iterator()) + testutil.Equals(t, test.exp[i].vals, res, "values mismatch at index:%v", i) + i++ + } + testutil.Ok(t, dedupSet.Err()) + }) } - testutil.Ok(t, dedupSet.Err()) } func TestDedupSeriesIterator(t *testing.T) {