Skip to content

Commit

Permalink
add time range parameters to labels API
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jul 31, 2020
1 parent ee52915 commit f5773fe
Show file tree
Hide file tree
Showing 15 changed files with 479 additions and 114 deletions.
82 changes: 52 additions & 30 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,9 @@ func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePa
}

func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiError) {
var ts time.Time
if t := r.FormValue("time"); t != "" {
var err error
ts, err = parseTime(t)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
} else {
ts = qapi.baseAPI.Now()
ts, err := parseTimeParam(r, "time", qapi.baseAPI.Now())
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

ctx := r.Context()
Expand Down Expand Up @@ -394,12 +388,27 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid label name: %q", name)}
}

start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
// Prometheus doesn't check this, do we need to?
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -433,26 +442,13 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("no match[] parameter provided")}
}

var start time.Time
if t := r.FormValue("start"); t != "" {
var err error
start, err = parseTime(t)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
} else {
start = minTime
start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

var end time.Time
if t := r.FormValue("end"); t != "" {
var err error
end, err = parseTime(t)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
} else {
end = maxTime
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

var matcherSets [][]*labels.Matcher
Expand Down Expand Up @@ -504,6 +500,18 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return metrics, set.Warnings(), nil
}

func parseTimeParam(r *http.Request, paramName string, defaultValue time.Time) (time.Time, error) {
val := r.FormValue(paramName)
if val == "" {
return defaultValue, nil
}
result, err := parseTime(val)
if err != nil {
return time.Time{}, errors.Wrapf(err, "Invalid time value for '%s'", paramName)
}
return result, nil
}

func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
Expand Down Expand Up @@ -532,12 +540,26 @@ func parseDuration(s string) (time.Duration, error) {
func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.ApiError) {
ctx := r.Context()

start, err := parseTimeParam(r, "start", minTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := parseTimeParam(r, "end", maxTime)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,14 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []sto

// LabelNames returns all known label names. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()

q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
Expand All @@ -599,9 +604,14 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL) ([]string,

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()

q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
Expand Down
13 changes: 11 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,12 @@ func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: name,
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
End: q.maxt,
})
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}
Expand All @@ -342,7 +347,11 @@ func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
End: q.maxt,
})
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
}

// LabelNames implements the storepb.StoreServer interface.
func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
func (s *BucketStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
g, gctx := errgroup.WithContext(ctx)

s.mtx.RLock()
Expand All @@ -1055,6 +1055,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesReque
var sets [][]string

for _, b := range s.blocks {
if b.meta.MinTime > r.End || b.meta.MaxTime < r.Start {
continue
}
indexr := b.indexReader(gctx)
g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")
Expand Down Expand Up @@ -1091,6 +1094,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
var sets [][]string

for _, b := range s.blocks {
if b.meta.MinTime > req.End || b.meta.MaxTime < req.Start {
continue
}
indexr := b.indexReader(gctx)
g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values")
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ Outer:
}

// LabelNames returns all known label names.
func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base)
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base, r.Start, r.End)
if err != nil {
return nil, err
}
Expand All @@ -536,7 +536,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
return &storepb.LabelValuesResponse{Values: []string{l}}, nil
}

vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label)
vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, r.Start, r.End)
if err != nil {
return nil, err
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,22 @@ func TestPrometheusStore_LabelNames_e2e(t *testing.T) {
proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{})
resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
Start: minTime.Unix(),
End: maxTime.Unix(),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{"a"}, resp.Names)

// Outside time range.
resp, err = proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
Start: maxTime.Add(-time.Second).Unix(),
End: maxTime.Unix(),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Names)
}

func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
Expand Down Expand Up @@ -413,10 +425,22 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: minTime.Unix(),
End: maxTime.Unix(),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{"a", "b", "c"}, resp.Values)

// Outside time range.
resp, err = proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: "a",
Start: maxTime.Add(-time.Second).Unix(),
End: maxTime.Unix(),
})
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), resp.Warnings)
testutil.Equals(t, []string{}, resp.Values)
}

// Test to check external label values retrieve.
Expand Down
4 changes: 4 additions & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
g.Go(func() error {
resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: r.PartialResponseDisabled,
Start: r.Start,
End: r.End,
})
if err != nil {
err = errors.Wrapf(err, "fetch label names from store %s", st)
Expand Down Expand Up @@ -610,6 +612,8 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{
Label: r.Label,
PartialResponseDisabled: r.PartialResponseDisabled,
Start: r.Start,
End: r.End,
})
if err != nil {
err = errors.Wrapf(err, "fetch label values from store %s", store)
Expand Down
8 changes: 8 additions & 0 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,8 @@ func TestProxyStore_LabelValues(t *testing.T) {
req := &storepb.LabelValuesRequest{
Label: "a",
PartialResponseDisabled: true,
Start: minTime.Unix(),
End: maxTime.Unix(),
}
resp, err := q.LabelValues(ctx, req)
testutil.Ok(t, err)
Expand Down Expand Up @@ -1139,6 +1141,8 @@ func TestProxyStore_LabelNames(t *testing.T) {
},
},
req: &storepb.LabelNamesRequest{
Start: minTime.Unix(),
End: maxTime.Unix(),
PartialResponseDisabled: true,
},
expectedNames: []string{"a", "b", "c", "d"},
Expand All @@ -1161,6 +1165,8 @@ func TestProxyStore_LabelNames(t *testing.T) {
},
},
req: &storepb.LabelNamesRequest{
Start: minTime.Unix(),
End: maxTime.Unix(),
PartialResponseDisabled: true,
},
expectedErr: errors.New("fetch label names from store test: error!"),
Expand All @@ -1182,6 +1188,8 @@ func TestProxyStore_LabelNames(t *testing.T) {
},
},
req: &storepb.LabelNamesRequest{
Start: minTime.Unix(),
End: maxTime.Unix(),
PartialResponseDisabled: false,
},
expectedNames: []string{"a", "b"},
Expand Down
Loading

0 comments on commit f5773fe

Please sign in to comment.