Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: Add /api/v1/labels support #905

Merged
merged 9 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver
- [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples.
- [#910](https://github.com/improbable-eng/thanos/pull/910) Query's stores UI page is now sorted by type and old DNS or File SD stores are removed after 5 minutes (configurable via the new `--store.unhealthy-timeout=5m` flag).
- [#905](https://github.com/improbable-eng/thanos/pull/905) New Query API: /api/v1/labels. Noticed that the API was added in Prometheus v2.6.

New options:

Expand Down
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.3-0.20181014000028-04af85275a5c/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tdewolff/minify/v2 v2.3.7/go.mod h1:DD1stRlSx6JsHfl1+E/HVMQeXiec9rD1UQ0epklIZLc=
github.com/tdewolff/parse/v2 v2.3.5/go.mod h1:HansaqmN4I/U7L6/tUp0NcwT2tFO0F4EAWYGSDzkYNk=
Expand Down
34 changes: 34 additions & 0 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.
r.Get("/label/:name/values", instr("label_values", api.labelValues))

r.Get("/series", instr("series", api.series))

r.Get("/labels", instr("label_names", api.labelNames))
}

type queryData struct {
Expand Down Expand Up @@ -614,3 +616,35 @@ func parseDuration(s string) (time.Duration, error) {
}
return 0, fmt.Errorf("cannot parse %q to a valid duration", s)
}

func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
ctx := r.Context()

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
povilasv marked this conversation as resolved.
Show resolved Hide resolved
if apiErr != nil {
povilasv marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil, apiErr
povilasv marked this conversation as resolved.
Show resolved Hide resolved
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelNames")

names, err := q.LabelNames()
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}

return names, warnings, nil
}
15 changes: 13 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,20 @@ func (q *querier) LabelValues(name string) ([]string, error) {
}

// LabelNames returns all the unique label names present in the block in sorted order.
// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702.
func (q *querier) LabelNames() ([]string, error) {
return nil, errors.New("not implemented")
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelNames()")
}

for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
}

return resp.Names, nil
}

func (q *querier) Close() error {
Expand Down
43 changes: 41 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,38 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
}

// LabelNames implements the storepb.StoreServer interface.
func (s *BucketStore) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
g, gctx := errgroup.WithContext(ctx)

s.mtx.RLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at some point we need to make this more readable.

Either abstract away this or rename as I don't know now if we block Every gRPC call now or what. I needed to dive really deep to tell. But that's not for this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can create a ticket to record the issue.


var mtx sync.Mutex
var sets [][]string

for _, b := range s.blocks {
indexr := b.indexReader(gctx)
g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")

res := indexr.LabelNames()
sort.Strings(res)

mtx.Lock()
sets = append(sets, res)
mtx.Unlock()

return nil
})
}

s.mtx.RUnlock()

if err := g.Wait(); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &storepb.LabelNamesResponse{
Names: strutil.MergeSlices(sets...),
}, nil
}

// LabelValues implements the storepb.StoreServer interface.
Expand Down Expand Up @@ -1616,6 +1646,15 @@ func (r *bucketIndexReader) LabelValues(name string) []string {
return res
}

// LabelNames returns a list of label names.
func (r *bucketIndexReader) LabelNames() []string {
res := make([]string, 0, len(r.block.lvals))
for ln, _ := range r.block.lvals {
res = append(res, ln)
}
return res
}

// Close released the underlying resources of the reader.
func (r *bucketIndexReader) Close() error {
r.block.pendingReaders.Done()
Expand Down
81 changes: 75 additions & 6 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import (
"google.golang.org/grpc/status"
)

var statusToCode = map[int]codes.Code{
http.StatusBadRequest: codes.InvalidArgument,
http.StatusNotFound: codes.NotFound,
http.StatusUnprocessableEntity: codes.Internal,
http.StatusServiceUnavailable: codes.Unavailable,
http.StatusInternalServerError: codes.Internal,
}

// PrometheusStore implements the store node API on top of the Prometheus remote read API.
type PrometheusStore struct {
logger log.Logger
Expand Down Expand Up @@ -336,10 +344,52 @@ func extendLset(lset []storepb.Label, extend labels.Labels) []storepb.Label {
}

// LabelNames returns all known label names.
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
u := *p.base
u.Path = path.Join(u.Path, "/api/v1/labels")

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

span, ctx := tracing.StartSpan(ctx, "/prom_label_names HTTP[client]")
povilasv marked this conversation as resolved.
Show resolved Hide resolved
defer span.Finish()

resp, err := p.client.Do(req.WithContext(ctx))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
}

if resp.StatusCode == http.StatusNoContent {
return &storepb.LabelNamesResponse{Names: []string{}}, nil
}

var m struct {
Data []string `json:"data"`
Status string `json:"status"`
Error string `json:"error"`
}
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if m.Status != "success" {
Copy link
Member

@povilasv povilasv Mar 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possability that both data and error are returned together? if yes then handle partial response, if no then close this :D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is same for other requests as well we do here 💩 We need to agree on something consistently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@povilasv I will be double check the response from Prometheus. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@povilasv Yes, I had checked and updated the code at this moment below.

code, exists := statusToCode[resp.StatusCode]
if !exists {
return nil, status.Error(codes.Internal, m.Error)
}
return nil, status.Error(code, m.Error)
}

return &storepb.LabelNamesResponse{Names: m.Data}, nil
}

// LabelValues returns all known label values for a given label name.
Expand All @@ -356,25 +406,44 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

span, ctx := tracing.StartSpan(ctx, "/prom_label_values HTTP[client]")
defer span.Finish()

resp, err := p.client.Do(req.WithContext(ctx))
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
}

if resp.StatusCode == http.StatusNoContent {
return &storepb.LabelValuesResponse{Values: []string{}}, nil
}

var m struct {
Data []string `json:"data"`
Data []string `json:"data"`
Status string `json:"status"`
Error string `json:"error"`
}
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

sort.Strings(m.Data)

if m.Status != "success" {
code, exists := statusToCode[resp.StatusCode]
if !exists {
return nil, status.Error(codes.Internal, m.Error)
}
return nil, status.Error(code, m.Error)
}

return &storepb.LabelValuesResponse{Values: m.Data}, nil
}
45 changes: 43 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,48 @@ func storeMatches(s Client, mint, maxt int64, matchers ...storepb.LabelMatcher)
func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
var (
warnings []string
names [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
)

for _, st := range s.stores() {
st := st
g.Go(func() error {
resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: r.PartialResponseDisabled,
})
if err != nil {
err = errors.Wrapf(err, "fetch label names from store %s", st)
if r.PartialResponseDisabled {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is the only valid partial response handling/triggering in this PR (:

return err
}

mtx.Lock()
warnings = append(warnings, err.Error())
mtx.Unlock()
return nil
}

mtx.Lock()
warnings = append(warnings, resp.Warnings...)
names = append(names, resp.Names)
mtx.Unlock()

return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

return &storepb.LabelNamesResponse{
Names: strutil.MergeUnsortedSlices(names...),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsorted ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the names from all stores could not be guaranteed sorted, so should we made a sort in all stores or just do it in proxy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HMM prometheus docs don't state the need for sorting, but all examples are sorted so I guess let's do sorting. I would do it in proxy :)

https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MergeUnsortedSlices would do sorting if the slices were not ordered yet. so the method is ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow the name is so confusing then! ;p

Warnings: warnings,
}, nil
}

// LabelValues returns all known label values for a given label name.
Expand All @@ -415,7 +456,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
store := st
g.Go(func() error {
resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{
Label: r.Label,
Label: r.Label,
PartialResponseDisabled: r.PartialResponseDisabled,
})
if err != nil {
Expand Down
Loading