Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time range parameters to labels API #2964

Merged
merged 2 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .bingo/Variables.mk
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ $(MINIO): .bingo/minio.mod
@echo "(re)installing $(GOBIN)/minio-v0.0.0-20200527010300-cccf2de129da"
@cd .bingo && $(GO) build -modfile=minio.mod -o=$(GOBIN)/minio-v0.0.0-20200527010300-cccf2de129da "github.com/minio/minio"

PROMETHEUS_ARRAY := $(GOBIN)/prometheus-v2.4.3+incompatible $(GOBIN)/prometheus-v1.8.2-0.20200507164740-ecee9c8abfd1
PROMETHEUS_ARRAY := $(GOBIN)/prometheus-v2.4.3+incompatible $(GOBIN)/prometheus-v1.8.2-0.20200724121523-657ba532e42f
$(PROMETHEUS_ARRAY): .bingo/prometheus.mod .bingo/prometheus.1.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
@echo "(re)installing $(GOBIN)/prometheus-v2.4.3+incompatible"
@cd .bingo && $(GO) build -modfile=prometheus.mod -o=$(GOBIN)/prometheus-v2.4.3+incompatible "github.com/prometheus/prometheus/cmd/prometheus"
@echo "(re)installing $(GOBIN)/prometheus-v1.8.2-0.20200507164740-ecee9c8abfd1"
@cd .bingo && $(GO) build -modfile=prometheus.1.mod -o=$(GOBIN)/prometheus-v1.8.2-0.20200507164740-ecee9c8abfd1 "github.com/prometheus/prometheus/cmd/prometheus"
@echo "(re)installing $(GOBIN)/prometheus-v1.8.2-0.20200724121523-657ba532e42f"
@cd .bingo && $(GO) build -modfile=prometheus.1.mod -o=$(GOBIN)/prometheus-v1.8.2-0.20200724121523-657ba532e42f "github.com/prometheus/prometheus/cmd/prometheus"

PROMTOOL := $(GOBIN)/promtool-v1.8.2-0.20200522113006-f4dd45609a05
$(PROMTOOL): .bingo/promtool.mod
Expand Down
8 changes: 4 additions & 4 deletions .bingo/prometheus.1.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ module _ // Auto generated by https://github.com/bwplotka/bingo. DO NOT EDIT

go 1.14

require github.com/prometheus/prometheus v1.8.2-0.20200507164740-ecee9c8abfd1 // cmd/prometheus
require github.com/prometheus/prometheus v1.8.2-0.20200724121523-657ba532e42f // cmd/prometheus

replace (
// Mitigation for: https://github.com/Azure/go-autorest/issues/414
github.com/Azure/go-autorest => github.com/Azure/go-autorest v12.3.0+incompatible
k8s.io/api => k8s.io/api v0.17.5
k8s.io/apimachinery => k8s.io/apimachinery v0.17.5
k8s.io/client-go => k8s.io/client-go v0.17.5
k8s.io/api => k8s.io/api v0.18.6
k8s.io/apimachinery => k8s.io/apimachinery v0.18.6
k8s.io/client-go => k8s.io/client-go v0.18.6
k8s.io/klog => github.com/simonpasquier/klog-gokit v0.1.0
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30
)
2 changes: 1 addition & 1 deletion .bingo/variables.env
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ LICHE="${gobin}/liche-v0.0.0-20181124191719-2a2e6e56f6c6"

MINIO="${gobin}/minio-v0.0.0-20200527010300-cccf2de129da"

PROMETHEUS_ARRAY="${gobin}/prometheus-v2.4.3+incompatible${gobin}/prometheus-v1.8.2-0.20200507164740-ecee9c8abfd1"
PROMETHEUS_ARRAY="${gobin}/prometheus-v2.4.3+incompatible${gobin}/prometheus-v1.8.2-0.20200724121523-657ba532e42f"

PROMTOOL="${gobin}/promtool-v1.8.2-0.20200522113006-f4dd45609a05"

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2926](https://github.com/thanos-io/thanos/pull/2926) API: Add new blocks HTTP API to serve blocks metadata. The status endpoints (`/api/v1/status/flags`, `/api/v1/status/runtimeinfo` and `/api/v1/status/buildinfo`) are now available on all components with a HTTP API.
- [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails.
- [#2865](https://github.com/thanos-io/thanos/pull/2865) ui: Migrate Thanos Ruler UI to React
- [#2964](https://github.com/thanos-io/thanos/pull/2964) Query: Add time range parameters to label APIs. Add `start` and `end` fields to Store API `LabelNamesRequest` and `LabelValuesRequest`.

### Changed

Expand Down
84 changes: 54 additions & 30 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,9 @@ func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePa
}

func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiError) {
var ts time.Time
if t := r.FormValue("time"); t != "" {
var err error
ts, err = parseTime(t)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
} else {
ts = qapi.baseAPI.Now()
ts, err := parseTimeParam(r, "time", qapi.baseAPI.Now())
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

ctx := r.Context()
Expand Down Expand Up @@ -425,12 +419,26 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid label name: %q", name)}
}

start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -464,26 +472,17 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("no match[] parameter provided")}
}

var start time.Time
if t := r.FormValue("start"); t != "" {
var err error
start, err = parseTime(t)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
} else {
start = minTime
start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

var end time.Time
if t := r.FormValue("end"); t != "" {
var err error
end, err = parseTime(t)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
} else {
end = maxTime
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

var matcherSets [][]*labels.Matcher
Expand Down Expand Up @@ -540,6 +539,18 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return metrics, set.Warnings(), nil
}

func parseTimeParam(r *http.Request, paramName string, defaultValue time.Time) (time.Time, error) {
val := r.FormValue(paramName)
if val == "" {
return defaultValue, nil
}
result, err := parseTime(val)
if err != nil {
return time.Time{}, errors.Wrapf(err, "Invalid time value for '%s'", paramName)
}
return result, nil
}

func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
Expand Down Expand Up @@ -568,6 +579,19 @@ func parseDuration(s string) (time.Duration, error) {
func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.ApiError) {
ctx := r.Context()

start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -578,7 +602,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,14 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []sto

// LabelNames returns all known label names. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()

q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
Expand All @@ -599,9 +604,14 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL) ([]string,

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()

q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
Expand Down
13 changes: 11 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,12 @@ func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: name,
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
End: q.maxt,
})
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}
Expand All @@ -351,7 +356,11 @@ func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
End: q.maxt,
})
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
}

// LabelNames implements the storepb.StoreServer interface.
func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
func (s *BucketStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
g, gctx := errgroup.WithContext(ctx)

s.mtx.RLock()
Expand All @@ -1055,6 +1055,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesReque
var sets [][]string

for _, b := range s.blocks {
if b.meta.MinTime > r.End || b.meta.MaxTime < r.Start {
continue
}
indexr := b.indexReader(gctx)
g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")
Expand Down Expand Up @@ -1091,6 +1094,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
var sets [][]string

for _, b := range s.blocks {
if b.meta.MinTime > req.End || b.meta.MaxTime < req.Start {
continue
}
indexr := b.indexReader(gctx)
g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values")
Expand Down
74 changes: 72 additions & 2 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite) {
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)

vals, err := s.store.LabelValues(ctx, &storepb.LabelValuesRequest{Label: "a"})
vals, err := s.store.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string{"1", "2"}, vals.Values)

Expand Down Expand Up @@ -381,7 +385,7 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite) {
MaxTime: maxt,
},
},
// Test no-chunk option.
// Test skip-chunk option.
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
Expand Down Expand Up @@ -599,3 +603,69 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
})
}
}

func TestBucketStore_LabelNames_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir, err := ioutil.TempDir("", "test_bucketstore_label_names_e2e")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)

vals, err := s.store.LabelNames(ctx, &storepb.LabelNamesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string{"a", "b", "c"}, vals.Names)

// Outside the time range.
vals, err = s.store.LabelNames(ctx, &storepb.LabelNamesRequest{
Start: timestamp.FromTime(time.Now().Add(-24 * time.Hour)),
End: timestamp.FromTime(time.Now().Add(-23 * time.Hour)),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals.Names)
})
}

func TestBucketStore_LabelValues_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir, err := ioutil.TempDir("", "test_bucketstore_label_values_e2e")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)

vals, err := s.store.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
})
testutil.Ok(t, err)
testutil.Equals(t, []string{"1", "2"}, vals.Values)

// Outside the time range.
vals, err = s.store.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: timestamp.FromTime(time.Now().Add(-24 * time.Hour)),
End: timestamp.FromTime(time.Now().Add(-23 * time.Hour)),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals.Values)
})
}
6 changes: 3 additions & 3 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ Outer:
}

// LabelNames returns all known label names.
func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base)
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base, r.Start, r.End)
if err != nil {
return nil, err
}
Expand All @@ -536,7 +536,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
return &storepb.LabelValuesResponse{Values: []string{l}}, nil
}

vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label)
vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, r.Start, r.End)
if err != nil {
return nil, err
}
Expand Down
Loading