Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: allow multiple deduplication label. #1362

Merged
merged 17 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ Accepted into CNCF:
request for both Prometheus and sidecar. Single requests now should take constant amount of memory on sidecar, so resource consumption prediction is now straightforward. This will be used if you have Prometheus `2.13` or `2.12-master`.
- [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type
- [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings
- [#1362](https://github.com/thanos-io/thanos/pull/1362) Optional `replicaLabels` param for `/query` and `/query_range` querier endpoints. When provided overwrite the `query.replica-label` cli flags.
- [#1395](https://github.com/thanos-io/thanos/pull/1395) Thanos Sidecar added `/-/ready` and `/-/healthy` endpoints to Thanos sidecar.
- [#1297](https://github.com/thanos-io/thanos/pull/1297) Thanos Compact added `/-/ready` and `/-/healthy` endpoints to Thanos compact.
- [#1431](https://github.com/thanos-io/thanos/pull/1431) Thanos Query added hidden flag to allow the use of downsampled resolution data for instant queries.
- [#1408](https://github.com/thanos-io/thanos/pull/1408) Thanos Store Gateway can now allow the specifying of supported time ranges it will serve (time sharding). Flags: `min-time` & `max-time`

### Changed

- [#1362](https://github.com/thanos-io/thanos/pull/1362) `query.replica-label` configuration can be provided more than once for multiple deduplication labels like: `--query.replica-label=prometheus_replica --query.replica-label=service`.
- [#1414](https://github.com/thanos-io/thanos/pull/1413) Upgraded important dependencies: Prometheus to 2.12-rc.0. TSDB is now part of Prometheus.
- [#1380](https://github.com/thanos-io/thanos/pull/1380) Upgraded important dependencies: Prometheus to 2.11.1 and TSDB to 0.9.1. Some changes affecting Querier:
- [ENHANCEMENT] Query performance improvement: Efficient iteration and search in HashForLabels and HashWithoutLabels. #5707
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

replicaLabel := cmd.Flag("query.replica-label", "Label to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
String()
replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
Strings()

instantDefaultMaxSourceResolution := modelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

Expand Down Expand Up @@ -146,7 +146,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*maxConcurrentQueries,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*replicaLabel,
*replicaLabels,
selectorLset,
*stores,
*enableAutodownsampling,
Expand Down Expand Up @@ -264,7 +264,7 @@ func runQuery(
maxConcurrentQueries int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
replicaLabel string,
replicaLabels []string,
selectorLset labels.Labels,
storeAddrs []string,
enableAutodownsampling bool,
Expand Down Expand Up @@ -312,7 +312,7 @@ func runQuery(
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Expand Down Expand Up @@ -404,7 +404,7 @@ func runQuery(
ins := extpromhttp.NewInstrumentationMiddleware(reg)
ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)

api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, instantDefaultMaxSourceResolution)
api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, replicaLabels, instantDefaultMaxSourceResolution)

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

Expand Down
39 changes: 33 additions & 6 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ $ thanos query \
## Deduplication

The query layer can deduplicate series that were collected from high-availability pairs of data sources such as Prometheus.
A fixed replica label must be chosen for the entire cluster and can then be passed to query nodes on startup.
A fixed single or multiple replica labels must be chosen for the entire cluster and can then be passed to query nodes on startup.

Two or more series that are only distinguished by the given replica label, will be merged into a single time series.
This also hides gaps in collection of a single data source. For example:
This also hides gaps in collection of a single data source.

### An example with a single replica labels:

* Prometheus + sidecar "A": `cluster=1,env=2,replica=A`
* Prometheus + sidecar "B": `cluster=1,env=2,replica=B`
Expand All @@ -47,12 +49,28 @@ And we query for metric `up{job="prometheus",env="2"}` with this option we will
* `up{job="prometheus",env="2",cluster="1"} 1`
* `up{job="prometheus",env="2",cluster="2"} 1`

WITHOUT this replica flag (so deduplication turned off), we will get 3 results:
WITHOUT this replica flag (deduplication turned off), we will get 3 results:

* `up{job="prometheus",env="2",cluster="1",replica="A"} 1`
* `up{job="prometheus",env="2",cluster="1",replica="B"} 1`
* `up{job="prometheus",env="2",cluster="2",replica="A"} 1`

### The same output will be present for this example with multiple replica labels:

* Prometheus + sidecar "A": `cluster=1,env=2,replica=A,replicaX=A`
* Prometheus + sidecar "B": `cluster=1,env=2,replica=B,replicaX=B`
* Prometheus + sidecar "A" in different cluster: `cluster=2,env=2,replica=A,replicaX=A`

```
$ thanos query \
--http-address "0.0.0.0:9090" \
--query.replica-label "replica" \
--query.replica-label "replicaX" \
--store "<store-api>:<grpc-port>" \
--store "<store-api2>:<grpc-port>" \
```


This logic can also be controlled via parameter on QueryAPI. More details below.

## Query API
Expand Down Expand Up @@ -89,14 +107,23 @@ Querier also allows to configure different timeouts:
If you prefer availability over accuracy you can set tighter timeout to underlying StoreAPI than overall query timeout. If partial response
strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warning with 200 status code response.

### Deduplication replica labels.

| HTTP URL/FORM parameter | Type | Default | Example |
|----|----|----|----|
| `replicaLabels` | `[]string` | `query.replica-label` flag (default: empty). | `replicaLabels=replicaA&replicaLabels=replicaB` |
| | | | |

This overwrites the `query.replica-label` cli flag to allow dynamic replica labels at query time.

### Deduplication Enabled

| HTTP URL/FORM parameter | Type | Default | Example |
|----|----|----|----|
| `dedup` | `Boolean` | True, but effect depends on `query.replica` configuration flag. | `1, t, T, TRUE, true, True` for "True" |
| | | | |

This controls if query should use `replica` label for deduplication or not.
This controls if query results should be deduplicated using the replica labels.

### Auto downsampling

Expand Down Expand Up @@ -236,8 +263,8 @@ Flags:
--query.timeout=2m Maximum time to process query by query node.
--query.max-concurrent=20 Maximum number of queries processed
concurrently by query node.
--query.replica-label=QUERY.REPLICA-LABEL
Label to treat as a replica indicator along
--query.replica-label=QUERY.REPLICA-LABEL ...
Labels to treat as a replica indicator along
which data is deduplicated. Still you will be
able to query without deduplication using
'dedup=false' parameter.
Expand Down
1 change: 1 addition & 0 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ thanos query \
--store 1.2.3.4:19090 \
--store 1.2.3.5:19090 \
--query.replica-label replica # Replica label for de-duplication
--query.replica-label replicaX # Supports multiple replica labels for de-duplication
```

Go to the configured HTTP address, and you should now be able to query across all Prometheus instances and receive de-duplicated data.
Expand Down
44 changes: 39 additions & 5 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type API struct {

enableAutodownsampling bool
enablePartialResponse bool
replicaLabels []string
reg prometheus.Registerer
defaultInstantQueryMaxSourceResolution time.Duration

Expand All @@ -116,6 +117,7 @@ func NewAPI(
c query.QueryableCreator,
enableAutodownsampling bool,
enablePartialResponse bool,
replicaLabels []string,
defaultInstantQueryMaxSourceResolution time.Duration,
) *API {
return &API{
Expand All @@ -124,6 +126,7 @@ func NewAPI(
queryableCreate: c,
enableAutodownsampling: enableAutodownsampling,
enablePartialResponse: enablePartialResponse,
replicaLabels: replicaLabels,
reg: reg,
defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution,

Expand Down Expand Up @@ -185,6 +188,21 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool
return enableDeduplication, nil
}

func (api *API) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *ApiError) {
const replicaLabelsParam = "replicaLabels[]"
if err := r.ParseForm(); err != nil {
return nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")}
}

replicaLabels = api.replicaLabels
// Overwrite the cli flag when provided as a query parameter.
if len(r.Form[replicaLabelsParam]) > 0 {
replicaLabels = r.Form[replicaLabelsParam]
}

return replicaLabels, nil
}

func (api *API) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second
Expand Down Expand Up @@ -212,6 +230,7 @@ func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialRespons
const partialResponseParam = "partial_response"
enablePartialResponse = api.enablePartialResponse

// Overwrite the cli flag when provided as a query parameter.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to put this below?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway I would be fine with just what we have in doc, so maybe we can drop this comment?

Copy link
Contributor Author

@krasi-georgiev krasi-georgiev Sep 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is the right place and it is for the partialResponseParam
I prefer to keep the comment here as I was a bit confused why this is done this way when I first saw it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you put it here then and not for parseDownsamplingParamMillis and parseReplicaLabelsParam? I don't get it ):

if val := r.FormValue(partialResponseParam); val != "" {
var err error
enablePartialResponse, err = strconv.ParseBool(val)
Expand Down Expand Up @@ -255,6 +274,11 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -269,7 +293,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts)
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand Down Expand Up @@ -341,6 +365,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

// If no max_source_resolution is specified fit at least 5 samples between steps.
maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step/5)
if apiErr != nil {
Expand All @@ -357,7 +386,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
defer span.Finish()

qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse),
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -397,7 +426,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -463,13 +492,18 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -572,7 +606,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
Loading