diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ce8c27c1e..1c3b566ff7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver - [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples. - [#910](https://github.com/improbable-eng/thanos/pull/910) Query's stores UI page is now sorted by type and old DNS or File SD stores are removed after 5 minutes (configurable via the new `--store.unhealthy-timeout=5m` flag). +- [#905](https://github.com/improbable-eng/thanos/pull/905) New Query API: /api/v1/labels. Noticed that the API was added in Prometheus v2.6. New options: diff --git a/go.sum b/go.sum index e8269186b4..40dc119b1b 100644 --- a/go.sum +++ b/go.sum @@ -239,7 +239,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.2.3-0.20181014000028-04af85275a5c/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/tdewolff/minify/v2 v2.3.7/go.mod h1:DD1stRlSx6JsHfl1+E/HVMQeXiec9rD1UQ0epklIZLc= github.com/tdewolff/parse/v2 v2.3.5/go.mod h1:HansaqmN4I/U7L6/tUp0NcwT2tFO0F4EAWYGSDzkYNk= diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index b05a109207..55ece3c853 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -176,6 +176,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. r.Get("/label/:name/values", instr("label_values", api.labelValues)) r.Get("/series", instr("series", api.series)) + + r.Get("/labels", instr("label_names", api.labelNames)) } type queryData struct { @@ -614,3 +616,35 @@ func parseDuration(s string) (time.Duration, error) { } return 0, fmt.Errorf("cannot parse %q to a valid duration", s) } + +func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { + ctx := r.Context() + + enablePartialResponse, apiErr := api.parsePartialResponseParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + + var ( + warnmtx sync.Mutex + warnings []error + ) + warningReporter := func(err error) { + warnmtx.Lock() + warnings = append(warnings, err) + warnmtx.Unlock() + } + + q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64) + if err != nil { + return nil, nil, &ApiError{errorExec, err} + } + defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelNames") + + names, err := q.LabelNames() + if err != nil { + return nil, nil, &ApiError{errorExec, err} + } + + return names, warnings, nil +} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 819ff3ac2a..07dc3ec71b 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -265,9 +265,20 @@ func (q *querier) LabelValues(name string) ([]string, error) { } // LabelNames returns all the unique label names present in the block in sorted order. -// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702. func (q *querier) LabelNames() ([]string, error) { - return nil, errors.New("not implemented") + span, ctx := tracing.StartSpan(q.ctx, "querier_label_names") + defer span.Finish() + + resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse}) + if err != nil { + return nil, errors.Wrap(err, "proxy LabelNames()") + } + + for _, w := range resp.Warnings { + q.warningReporter(errors.New(w)) + } + + return resp.Names, nil } func (q *querier) Close() error { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 8e6c3533c5..b75e16e58c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -856,8 +856,38 @@ func chunksSize(chks []storepb.AggrChunk) (size int) { } // LabelNames implements the storepb.StoreServer interface. -func (s *BucketStore) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") +func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + g, gctx := errgroup.WithContext(ctx) + + s.mtx.RLock() + + var mtx sync.Mutex + var sets [][]string + + for _, b := range s.blocks { + indexr := b.indexReader(gctx) + g.Go(func() error { + defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") + + res := indexr.LabelNames() + sort.Strings(res) + + mtx.Lock() + sets = append(sets, res) + mtx.Unlock() + + return nil + }) + } + + s.mtx.RUnlock() + + if err := g.Wait(); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &storepb.LabelNamesResponse{ + Names: strutil.MergeSlices(sets...), + }, nil } // LabelValues implements the storepb.StoreServer interface. @@ -1616,6 +1646,15 @@ func (r *bucketIndexReader) LabelValues(name string) []string { return res } +// LabelNames returns a list of label names. +func (r *bucketIndexReader) LabelNames() []string { + res := make([]string, 0, len(r.block.lvals)) + for ln, _ := range r.block.lvals { + res = append(res, ln) + } + return res +} + // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index e8fd677f53..ea3482a19b 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -29,6 +29,14 @@ import ( "google.golang.org/grpc/status" ) +var statusToCode = map[int]codes.Code{ + http.StatusBadRequest: codes.InvalidArgument, + http.StatusNotFound: codes.NotFound, + http.StatusUnprocessableEntity: codes.Internal, + http.StatusServiceUnavailable: codes.Unavailable, + http.StatusInternalServerError: codes.Internal, +} + // PrometheusStore implements the store node API on top of the Prometheus remote read API. type PrometheusStore struct { logger log.Logger @@ -336,10 +344,52 @@ func extendLset(lset []storepb.Label, extend labels.Labels) []storepb.Label { } // LabelNames returns all known label names. -func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( +func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - return nil, status.Error(codes.Unimplemented, "not implemented") + u := *p.base + u.Path = path.Join(u.Path, "/api/v1/labels") + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + span, ctx := tracing.StartSpan(ctx, "/prom_label_names HTTP[client]") + defer span.Finish() + + resp, err := p.client.Do(req.WithContext(ctx)) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body") + + if resp.StatusCode/100 != 2 { + return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status)) + } + + if resp.StatusCode == http.StatusNoContent { + return &storepb.LabelNamesResponse{Names: []string{}}, nil + } + + var m struct { + Data []string `json:"data"` + Status string `json:"status"` + Error string `json:"error"` + } + if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + if m.Status != "success" { + code, exists := statusToCode[resp.StatusCode] + if !exists { + return nil, status.Error(codes.Internal, m.Error) + } + return nil, status.Error(code, m.Error) + } + + return &storepb.LabelNamesResponse{Names: m.Data}, nil } // LabelValues returns all known label values for a given label name. @@ -356,7 +406,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue req, err := http.NewRequest("GET", u.String(), nil) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } span, ctx := tracing.StartSpan(ctx, "/prom_label_values HTTP[client]") @@ -364,17 +414,36 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue resp, err := p.client.Do(req.WithContext(ctx)) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body") + if resp.StatusCode/100 != 2 { + return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status)) + } + + if resp.StatusCode == http.StatusNoContent { + return &storepb.LabelValuesResponse{Values: []string{}}, nil + } + var m struct { - Data []string `json:"data"` + Data []string `json:"data"` + Status string `json:"status"` + Error string `json:"error"` } if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } + sort.Strings(m.Data) + if m.Status != "success" { + code, exists := statusToCode[resp.StatusCode] + if !exists { + return nil, status.Error(codes.Internal, m.Error) + } + return nil, status.Error(code, m.Error) + } + return &storepb.LabelValuesResponse{Values: m.Data}, nil } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 9324d62247..bbcf8fdfa1 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -397,7 +397,48 @@ func storeMatches(s Client, mint, maxt int64, matchers ...storepb.LabelMatcher) func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - return nil, status.Error(codes.Unimplemented, "not implemented") + var ( + warnings []string + names [][]string + mtx sync.Mutex + g, gctx = errgroup.WithContext(ctx) + ) + + for _, st := range s.stores() { + st := st + g.Go(func() error { + resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{ + PartialResponseDisabled: r.PartialResponseDisabled, + }) + if err != nil { + err = errors.Wrapf(err, "fetch label names from store %s", st) + if r.PartialResponseDisabled { + return err + } + + mtx.Lock() + warnings = append(warnings, err.Error()) + mtx.Unlock() + return nil + } + + mtx.Lock() + warnings = append(warnings, resp.Warnings...) + names = append(names, resp.Names) + mtx.Unlock() + + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + return &storepb.LabelNamesResponse{ + Names: strutil.MergeUnsortedSlices(names...), + Warnings: warnings, + }, nil } // LabelValues returns all known label values for a given label name. @@ -415,7 +456,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ store := st g.Go(func() error { resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{ - Label: r.Label, + Label: r.Label, PartialResponseDisabled: r.PartialResponseDisabled, }) if err != nil { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index ed1c1d3b0a..caaf6ff8bc 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -687,6 +687,113 @@ func TestProxyStore_LabelValues(t *testing.T) { testutil.Equals(t, 1, len(resp.Warnings)) } +func TestProxyStore_LabelNames(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + for _, tc := range []struct { + title string + storeAPIs []Client + + req *storepb.LabelNamesRequest + + expectedNames []string + expectedErr error + expectedWarningsLen int + }{ + { + title: "label_names partial response disabled", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "c", "d"}, + }, + }, + }, + }, + req: &storepb.LabelNamesRequest{ + PartialResponseDisabled: true, + }, + expectedNames: []string{"a", "b", "c", "d"}, + expectedWarningsLen: 0, + }, + { + title: "label_names partial response disabled, but returns error", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespError: errors.New("error!"), + }, + }, + }, + req: &storepb.LabelNamesRequest{ + PartialResponseDisabled: true, + }, + expectedErr: errors.New("fetch label names from store test: error!"), + }, + { + title: "label_names partial response enabled", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespError: errors.New("error!"), + }, + }, + }, + req: &storepb.LabelNamesRequest{ + PartialResponseDisabled: false, + }, + expectedNames: []string{"a", "b"}, + expectedWarningsLen: 1, + }, + } { + if ok := t.Run(tc.title, func(t *testing.T) { + q := NewProxyStore( + nil, + func() []Client { return tc.storeAPIs }, + component.Query, + nil, + 0*time.Second, + ) + + ctx := context.Background() + resp, err := q.LabelNames(ctx, tc.req) + if tc.expectedErr != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tc.expectedErr.Error(), err.Error()) + return + } + testutil.Ok(t, err) + + testutil.Equals(t, tc.expectedNames, resp.Names) + testutil.Equals(t, tc.expectedWarningsLen, len(resp.Warnings), "got %v", resp.Warnings) + }); !ok { + return + } + } +} + type rawSeries struct { lset []storepb.Label samples []sample @@ -830,11 +937,13 @@ func (s *storeSeriesServer) Context() context.Context { type mockedStoreAPI struct { RespSeries []*storepb.SeriesResponse RespLabelValues *storepb.LabelValuesResponse + RespLabelNames *storepb.LabelNamesResponse RespError error RespDuration time.Duration LastSeriesReq *storepb.SeriesRequest LastLabelValuesReq *storepb.LabelValuesRequest + LastLabelNamesReq *storepb.LabelNamesRequest } func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ ...grpc.CallOption) (*storepb.InfoResponse, error) { @@ -848,7 +957,9 @@ func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, } func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") + s.LastLabelNamesReq = req + + return s.RespLabelNames, s.RespError } func (s *mockedStoreAPI) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest, _ ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 1a5b5f820b..bf43d1e975 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -165,10 +165,20 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb. } // LabelNames returns all known label names. -func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( +func (s *TSDBStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - return nil, status.Error(codes.Unimplemented, "not implemented") + q, err := s.db.Querier(math.MinInt64, math.MaxInt64) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names") + + res, err := q.LabelNames() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &storepb.LabelNamesResponse{Names: res}, nil } // LabelValues returns all known label values for a given label name.