Skip to content

Commit

Permalink
querier: Add a flag to limit time range for metadata APIs (Labels and…
Browse files Browse the repository at this point in the history
… Series) (#3147)

* Limit the default time window for label name and label value APIs

Signed-off-by: Kemal Akkoyun <[email protected]>

* Keep the existing behaviour as default

Signed-off-by: Kemal Akkoyun <[email protected]>

* Add label API endpoint tests

Signed-off-by: Kemal Akkoyun <[email protected]>

* Fix changelog entry

Signed-off-by: Kemal Akkoyun <[email protected]>

* Address review issues

Implement time ranges for series API

Signed-off-by: Kemal Akkoyun <[email protected]>

* Update changelog

Signed-off-by: Kemal Akkoyun <[email protected]>

* Fix formatting

Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun authored Sep 21, 2020
1 parent 16e06d5 commit 68d0ad2
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 180 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also time range metadata based store filtering is supported on Labels APIs.
- [#3154](https://github.com/thanos-io/thanos/pull/3154) Query Frontend: Add metric `thanos_memcached_getmulti_gate_queries_max`.
- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_received_frames` histogram metric.
- [#3147](https://github.com/thanos-io/thanos/pull/3147) Querier: Add `query.metadata.default-time-range` flag to specify the default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning.

### Changed

- [#3136](https://github.com/thanos-io/thanos/pull/3136) Sidecar: Add metric `thanos_sidecar_reloader_config_apply_operations_total` and rename metric `thanos_sidecar_reloader_config_apply_errors_total` to `thanos_sidecar_reloader_config_apply_operations_failed_total`.
- [#3154](https://github.com/thanos-io/thanos/pull/3154) Query: Add metric `thanos_query_gate_queries_max`. Remove metric `thanos_query_concurrent_selects_gate_queries_in_flight`.
- [#3154](https://github.com/thanos-io/thanos/pull/3154) Querier: Add metric `thanos_query_gate_queries_max`. Remove metric `thanos_query_concurrent_selects_gate_queries_in_flight`.
- [#3154](https://github.com/thanos-io/thanos/pull/3154) Store: Rename metric `thanos_bucket_store_queries_concurrent_max` to `thanos_bucket_store_series_gate_queries_max`.
- [#3179](https://github.com/thanos-io/thanos/pull/3179) Store: context.Canceled will not increase `thanos_objstore_bucket_operation_failures_total`.

Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"

"github.com/thanos-io/thanos/pkg/extkingpin"

v1 "github.com/thanos-io/thanos/pkg/api/query"
Expand Down Expand Up @@ -81,6 +82,8 @@ func registerQuery(app *extkingpin.App) {

instantDefaultMaxSourceResolution := modelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

defaultMetadataTimeRange := cmd.Flag("query.metadata.default-time-range", "The default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning.").Default("0s").Duration()

selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated).").
PlaceHolder("<name>=\"<value>\"").Strings()

Expand Down Expand Up @@ -193,6 +196,7 @@ func registerQuery(app *extkingpin.App) {
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
time.Duration(*instantDefaultMaxSourceResolution),
*defaultMetadataTimeRange,
*strictStores,
component.Query,
)
Expand Down Expand Up @@ -241,6 +245,7 @@ func runQuery(
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
instantDefaultMaxSourceResolution time.Duration,
defaultMetadataTimeRange time.Duration,
strictStores []string,
comp component.Component,
) error {
Expand Down Expand Up @@ -442,6 +447,7 @@ func runQuery(
queryReplicaLabels,
flagsMap,
instantDefaultMaxSourceResolution,
defaultMetadataTimeRange,
gate.New(
extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg),
maxConcurrentQueries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/user"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extkingpin"
Expand All @@ -28,7 +30,6 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/weaveworks/common/user"
)

type queryFrontendConfig struct {
Expand Down
6 changes: 6 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,12 @@ Flags:
able to query without deduplication using
'dedup=false' parameter. Data includes time
series, recording rules, and alerting rules.
--query.metadata.default-time-range=0s
The default metadata time range duration for
retrieving labels through Labels and Series API
when the range parameters are not specified.
The zero value means range covers the time
since the beginning.
--selector-label=<name>="<value>" ...
Query selector labels that will be exposed in
info endpoint (repeated).
Expand Down
159 changes: 84 additions & 75 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ type QueryAPI struct {
enableAutodownsampling bool
enableQueryPartialResponse bool
enableRulePartialResponse bool
replicaLabels []string

storeSet *query.StoreSet
replicaLabels []string
storeSet *query.StoreSet

defaultInstantQueryMaxSourceResolution time.Duration
defaultMetadataTimeRange time.Duration
}

// NewQueryAPI returns an initialized QueryAPI type.
Expand All @@ -89,6 +91,7 @@ func NewQueryAPI(
replicaLabels []string,
flagsMap map[string]string,
defaultInstantQueryMaxSourceResolution time.Duration,
defaultMetadataTimeRange time.Duration,
gate gate.Gate,
) *QueryAPI {
return &QueryAPI{
Expand All @@ -105,6 +108,7 @@ func NewQueryAPI(
replicaLabels: replicaLabels,
storeSet: storeSet,
defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution,
defaultMetadataTimeRange: defaultMetadataTimeRange,
}
}

Expand Down Expand Up @@ -418,18 +422,10 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid label name: %q", name)}
}

start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", maxTime)
start, end, err := parseMetadataTimeRange(r, qapi.defaultMetadataTimeRange)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
Expand Down Expand Up @@ -462,11 +458,6 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return vals, warnings, nil
}

var (
minTime = time.Unix(math.MinInt64/1000+62135596801, 0)
maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999)
)

func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiError) {
if err := r.ParseForm(); err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
Expand All @@ -476,18 +467,10 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("no match[] parameter provided")}
}

start, err := parseTimeParam(r, "start", minTime)
start, end, err := parseMetadataTimeRange(r, qapi.defaultMetadataTimeRange)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

var matcherSets [][]*labels.Matcher
for _, s := range r.Form["match[]"] {
Expand Down Expand Up @@ -543,59 +526,11 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return metrics, set.Warnings(), nil
}

func parseTimeParam(r *http.Request, paramName string, defaultValue time.Time) (time.Time, error) {
val := r.FormValue(paramName)
if val == "" {
return defaultValue, nil
}
result, err := parseTime(val)
if err != nil {
return time.Time{}, errors.Wrapf(err, "Invalid time value for '%s'", paramName)
}
return result, nil
}

func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
ns = math.Round(ns*1000) / 1000
return time.Unix(int64(s), int64(ns*float64(time.Second))), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t, nil
}
return time.Time{}, errors.Errorf("cannot parse %q to a valid timestamp", s)
}

func parseDuration(s string) (time.Duration, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, errors.Errorf("cannot parse %q to a valid duration. It overflows int64", s)
}
return time.Duration(ts), nil
}
if d, err := model.ParseDuration(s); err == nil {
return time.Duration(d), nil
}
return 0, errors.Errorf("cannot parse %q to a valid duration", s)
}

func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.ApiError) {
ctx := r.Context()

start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", maxTime)
start, end, err := parseMetadataTimeRange(r, qapi.defaultMetadataTimeRange)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
Expand All @@ -607,7 +542,8 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -659,3 +595,76 @@ func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(
return groups, warnings, nil
}
}

var (
infMinTime = time.Unix(math.MinInt64/1000+62135596801, 0)
infMaxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999)
)

func parseMetadataTimeRange(r *http.Request, defaultMetadataTimeRange time.Duration) (time.Time, time.Time, error) {
// If start and end time not specified as query parameter, we get the range from the beginning of time by default.
var defaultStartTime, defaultEndTime time.Time
if defaultMetadataTimeRange == 0 {
defaultStartTime = infMinTime
defaultEndTime = infMaxTime
} else {
now := time.Now()
defaultStartTime = now.Add(-defaultMetadataTimeRange)
defaultEndTime = now
}

start, err := parseTimeParam(r, "start", defaultStartTime)
if err != nil {
return time.Time{}, time.Time{}, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", defaultEndTime)
if err != nil {
return time.Time{}, time.Time{}, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
if end.Before(start) {
return time.Time{}, time.Time{}, &api.ApiError{
Typ: api.ErrorBadData,
Err: errors.New("end timestamp must not be before start time"),
}
}

return start, end, nil
}

func parseTimeParam(r *http.Request, paramName string, defaultValue time.Time) (time.Time, error) {
val := r.FormValue(paramName)
if val == "" {
return defaultValue, nil
}
result, err := parseTime(val)
if err != nil {
return time.Time{}, errors.Wrapf(err, "Invalid time value for '%s'", paramName)
}
return result, nil
}

func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
ns = math.Round(ns*1000) / 1000
return time.Unix(int64(s), int64(ns*float64(time.Second))), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t, nil
}
return time.Time{}, errors.Errorf("cannot parse %q to a valid timestamp", s)
}

func parseDuration(s string) (time.Duration, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, errors.Errorf("cannot parse %q to a valid duration. It overflows int64", s)
}
return time.Duration(ts), nil
}
if d, err := model.ParseDuration(s); err == nil {
return time.Duration(d), nil
}
return 0, errors.Errorf("cannot parse %q to a valid duration", s)
}
Loading

0 comments on commit 68d0ad2

Please sign in to comment.