Skip to content

Commit

Permalink
Query: allow multiple deduplication label. (#1362)
Browse files Browse the repository at this point in the history
Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
krasi-georgiev authored and bwplotka committed Sep 16, 2019
1 parent cc17395 commit f4757f5
Show file tree
Hide file tree
Showing 9 changed files with 706 additions and 193 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ Accepted into CNCF:
request for both Prometheus and sidecar. Single requests now should take constant amount of memory on sidecar, so resource consumption prediction is now straightforward. This will be used if you have Prometheus `2.13` or `2.12-master`.
- [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type
- [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings
- [#1362](https://github.com/thanos-io/thanos/pull/1362) Optional `replicaLabels` param for `/query` and `/query_range` querier endpoints. When provided overwrite the `query.replica-label` cli flags.
- [#1395](https://github.com/thanos-io/thanos/pull/1395) Thanos Sidecar added `/-/ready` and `/-/healthy` endpoints to Thanos sidecar.
- [#1297](https://github.com/thanos-io/thanos/pull/1297) Thanos Compact added `/-/ready` and `/-/healthy` endpoints to Thanos compact.
- [#1431](https://github.com/thanos-io/thanos/pull/1431) Thanos Query added hidden flag to allow the use of downsampled resolution data for instant queries.
- [#1408](https://github.com/thanos-io/thanos/pull/1408) Thanos Store Gateway can now allow the specifying of supported time ranges it will serve (time sharding). Flags: `min-time` & `max-time`

### Changed

- [#1362](https://github.com/thanos-io/thanos/pull/1362) `query.replica-label` configuration can be provided more than once for multiple deduplication labels like: `--query.replica-label=prometheus_replica --query.replica-label=service`.
- [#1414](https://github.com/thanos-io/thanos/pull/1413) Upgraded important dependencies: Prometheus to 2.12-rc.0. TSDB is now part of Prometheus.
- [#1380](https://github.com/thanos-io/thanos/pull/1380) Upgraded important dependencies: Prometheus to 2.11.1 and TSDB to 0.9.1. Some changes affecting Querier:
- [ENHANCEMENT] Query performance improvement: Efficient iteration and search in HashForLabels and HashWithoutLabels. #5707
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

replicaLabel := cmd.Flag("query.replica-label", "Label to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
String()
replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
Strings()

instantDefaultMaxSourceResolution := modelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

Expand Down Expand Up @@ -146,7 +146,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*maxConcurrentQueries,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*replicaLabel,
*replicaLabels,
selectorLset,
*stores,
*enableAutodownsampling,
Expand Down Expand Up @@ -264,7 +264,7 @@ func runQuery(
maxConcurrentQueries int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
replicaLabel string,
replicaLabels []string,
selectorLset labels.Labels,
storeAddrs []string,
enableAutodownsampling bool,
Expand Down Expand Up @@ -312,7 +312,7 @@ func runQuery(
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Expand Down Expand Up @@ -404,7 +404,7 @@ func runQuery(
ins := extpromhttp.NewInstrumentationMiddleware(reg)
ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)

api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, instantDefaultMaxSourceResolution)
api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, replicaLabels, instantDefaultMaxSourceResolution)

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

Expand Down
39 changes: 33 additions & 6 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ $ thanos query \
## Deduplication

The query layer can deduplicate series that were collected from high-availability pairs of data sources such as Prometheus.
A fixed replica label must be chosen for the entire cluster and can then be passed to query nodes on startup.
A fixed single or multiple replica labels must be chosen for the entire cluster and can then be passed to query nodes on startup.

Two or more series that are only distinguished by the given replica label, will be merged into a single time series.
This also hides gaps in collection of a single data source. For example:
This also hides gaps in collection of a single data source.

### An example with a single replica labels:

* Prometheus + sidecar "A": `cluster=1,env=2,replica=A`
* Prometheus + sidecar "B": `cluster=1,env=2,replica=B`
Expand All @@ -47,12 +49,28 @@ And we query for metric `up{job="prometheus",env="2"}` with this option we will
* `up{job="prometheus",env="2",cluster="1"} 1`
* `up{job="prometheus",env="2",cluster="2"} 1`

WITHOUT this replica flag (so deduplication turned off), we will get 3 results:
WITHOUT this replica flag (deduplication turned off), we will get 3 results:

* `up{job="prometheus",env="2",cluster="1",replica="A"} 1`
* `up{job="prometheus",env="2",cluster="1",replica="B"} 1`
* `up{job="prometheus",env="2",cluster="2",replica="A"} 1`

### The same output will be present for this example with multiple replica labels:

* Prometheus + sidecar "A": `cluster=1,env=2,replica=A,replicaX=A`
* Prometheus + sidecar "B": `cluster=1,env=2,replica=B,replicaX=B`
* Prometheus + sidecar "A" in different cluster: `cluster=2,env=2,replica=A,replicaX=A`

```
$ thanos query \
--http-address "0.0.0.0:9090" \
--query.replica-label "replica" \
--query.replica-label "replicaX" \
--store "<store-api>:<grpc-port>" \
--store "<store-api2>:<grpc-port>" \
```


This logic can also be controlled via parameter on QueryAPI. More details below.

## Query API
Expand Down Expand Up @@ -89,14 +107,23 @@ Querier also allows to configure different timeouts:
If you prefer availability over accuracy you can set tighter timeout to underlying StoreAPI than overall query timeout. If partial response
strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warning with 200 status code response.

### Deduplication replica labels.

| HTTP URL/FORM parameter | Type | Default | Example |
|----|----|----|----|
| `replicaLabels` | `[]string` | `query.replica-label` flag (default: empty). | `replicaLabels=replicaA&replicaLabels=replicaB` |
| | | | |

This overwrites the `query.replica-label` cli flag to allow dynamic replica labels at query time.

### Deduplication Enabled

| HTTP URL/FORM parameter | Type | Default | Example |
|----|----|----|----|
| `dedup` | `Boolean` | True, but effect depends on `query.replica` configuration flag. | `1, t, T, TRUE, true, True` for "True" |
| | | | |

This controls if query should use `replica` label for deduplication or not.
This controls if query results should be deduplicated using the replica labels.

### Auto downsampling

Expand Down Expand Up @@ -236,8 +263,8 @@ Flags:
--query.timeout=2m Maximum time to process query by query node.
--query.max-concurrent=20 Maximum number of queries processed
concurrently by query node.
--query.replica-label=QUERY.REPLICA-LABEL
Label to treat as a replica indicator along
--query.replica-label=QUERY.REPLICA-LABEL ...
Labels to treat as a replica indicator along
which data is deduplicated. Still you will be
able to query without deduplication using
'dedup=false' parameter.
Expand Down
1 change: 1 addition & 0 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ thanos query \
--store 1.2.3.4:19090 \
--store 1.2.3.5:19090 \
--query.replica-label replica # Replica label for de-duplication
--query.replica-label replicaX # Supports multiple replica labels for de-duplication
```

Go to the configured HTTP address, and you should now be able to query across all Prometheus instances and receive de-duplicated data.
Expand Down
44 changes: 39 additions & 5 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type API struct {

enableAutodownsampling bool
enablePartialResponse bool
replicaLabels []string
reg prometheus.Registerer
defaultInstantQueryMaxSourceResolution time.Duration

Expand All @@ -116,6 +117,7 @@ func NewAPI(
c query.QueryableCreator,
enableAutodownsampling bool,
enablePartialResponse bool,
replicaLabels []string,
defaultInstantQueryMaxSourceResolution time.Duration,
) *API {
return &API{
Expand All @@ -124,6 +126,7 @@ func NewAPI(
queryableCreate: c,
enableAutodownsampling: enableAutodownsampling,
enablePartialResponse: enablePartialResponse,
replicaLabels: replicaLabels,
reg: reg,
defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution,

Expand Down Expand Up @@ -185,6 +188,21 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool
return enableDeduplication, nil
}

func (api *API) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *ApiError) {
const replicaLabelsParam = "replicaLabels[]"
if err := r.ParseForm(); err != nil {
return nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")}
}

replicaLabels = api.replicaLabels
// Overwrite the cli flag when provided as a query parameter.
if len(r.Form[replicaLabelsParam]) > 0 {
replicaLabels = r.Form[replicaLabelsParam]
}

return replicaLabels, nil
}

func (api *API) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second
Expand Down Expand Up @@ -212,6 +230,7 @@ func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialRespons
const partialResponseParam = "partial_response"
enablePartialResponse = api.enablePartialResponse

// Overwrite the cli flag when provided as a query parameter.
if val := r.FormValue(partialResponseParam); val != "" {
var err error
enablePartialResponse, err = strconv.ParseBool(val)
Expand Down Expand Up @@ -255,6 +274,11 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -269,7 +293,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts)
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand Down Expand Up @@ -341,6 +365,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

// If no max_source_resolution is specified fit at least 5 samples between steps.
maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step/5)
if apiErr != nil {
Expand All @@ -357,7 +386,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
defer span.Finish()

qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse),
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -397,7 +426,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -463,13 +492,18 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -572,7 +606,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
Loading

0 comments on commit f4757f5

Please sign in to comment.