From ab5488b47fc8e58707673ba339cabce7517963ae Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Fri, 13 Sep 2019 11:38:07 +0300 Subject: [PATCH] added api replica tests Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- pkg/query/api/v1_test.go | 361 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 359 insertions(+), 2 deletions(-) diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 586a9733895..4d02ae362b3 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "math/rand" "net/http" "net/http/httptest" @@ -33,18 +34,164 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/thanos-io/thanos/pkg/compact" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" ) +type testStoreServer struct { + storage.Queryable +} + +func (s *testStoreServer) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { + return nil, errors.New("not implemented") +} + +func (s *testStoreServer) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { + q, err := s.Querier(context.Background(), r.MinTime, r.MaxTime) + if err != nil { + return err + } + defer func() { + if errClose := q.Close(); errClose != nil { + err = errClose + } + }() + + matchers, err := translateMatchers(r.Matchers) + seriesSets, warn, err := q.Select(&storage.SelectParams{}, matchers...) + if len(warn) != 0 { + return fmt.Errorf("querier selection contains warnings: %v", warn) + } + for seriesSets.Next() { + ss := seriesSets.At() + it := ss.Iterator() + samples := make([]sample, 0) + + for it.Next() { + t, v := it.At() + samples = append(samples, sample{t: t, v: v}) + } + if it.Err() != nil { + return it.Err() + } + resp, err := storeSeriesResponse(ss.Labels(), samples) + if err != nil { + return err + } + err = srv.Send(resp) + if err != nil { + return err + } + } + if seriesSets.Err() != nil { + return seriesSets.Err() + } + return nil +} + +func (s *testStoreServer) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64) + if err != nil { + return nil, err + } + names, _, err := q.LabelNames() + if err != nil { + return nil, err + } + if err := q.Close();err != nil { + return nil, err + } + return &storepb.LabelNamesResponse{Names: names}, nil +} +func (s *testStoreServer) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64) + if err != nil { + return nil, err + } + values, _, err := q.LabelValues(req.Label) + if err != nil { + return nil, err + } + if err := q.Close();err != nil { + return nil, err + } + return &storepb.LabelValuesResponse{Values: values}, nil +} + +type sample struct { + t int64 + v float64 +} + +func translateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) { + switch m.Type { + case storepb.LabelMatcher_EQ: + return labels.NewMatcher(labels.MatchEqual, m.Name, m.Value) + case storepb.LabelMatcher_NEQ: + return labels.NewMatcher(labels.MatchNotEqual, m.Name, m.Value) + case storepb.LabelMatcher_RE: + return labels.NewMatcher(labels.MatchRegexp, m.Name, m.Value) + case storepb.LabelMatcher_NRE: + return labels.NewMatcher(labels.MatchNotRegexp, m.Name, m.Value) + } + return nil, fmt.Errorf("unknown label matcher type %d", m.Type) +} + +func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err error) { + for _, m := range ms { + r, err := translateMatcher(m) + if err != nil { + return nil, err + } + res = append(res, r) + } + return res, nil +} + +// storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. +func storeSeriesResponse(lset labels.Labels, smplChunks ...[]sample) (*storepb.SeriesResponse, error) { + var s storepb.Series + + for _, l := range lset { + s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value}) + } + + for _, smpls := range smplChunks { + c := chunkenc.NewXORChunk() + a, err := c.Appender() + if err != nil { + return nil, err + } + + for _, smpl := range smpls { + a.Append(smpl.t, smpl.v) + } + + ch := storepb.AggrChunk{ + MinTime: smpls[0].t, + MaxTime: smpls[len(smpls)-1].t, + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, + } + + s.Chunks = append(s.Chunks, ch) + } + return storepb.NewSeriesResponse(&s), nil +} + func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator { - return func(_ bool, _ []string, _ int64, _ bool) storage.Queryable { - return queryable + return func(dedup bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable { + testProxy := &testStoreServer{ + Queryable: queryable, + } + return query.NewQueryableCreator(nil, testProxy)(dedup, replicaLabels, maxResolutionMillis, partialResponse) } } @@ -54,6 +201,10 @@ func TestEndpoints(t *testing.T) { test_metric1{foo="bar"} 0+100x100 test_metric1{foo="boo"} 1+0x100 test_metric2{foo="boo"} 1+0x100 + test_metric_replica1{foo="bar",replica="a"} 1+1x1 + test_metric_replica1{foo="boo",replica="a"} 1+1x1 + test_metric_replica1{foo="boo",replica="b"} 1+1x1 + test_metric_replica1{foo="boo",replica1="a"} 1+1x1 `) if err != nil { t.Fatal(err) @@ -125,6 +276,211 @@ func TestEndpoints(t *testing.T) { }, }, }, + // Query endpoint without deduplication. + { + endpoint: api.query, + query: url.Values{ + "query": []string{"test_metric_replica1"}, + "time": []string{"1970-01-01T01:02:03+01:00"}, + }, + response: &queryData{ + ResultType: promql.ValueTypeVector, + Result: promql.Vector{ + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "bar", + }, + { + Name: "replica", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica", + Value: "b", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica1", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + }, + }, + }, + // Query endpoint with single deduplication label. + { + endpoint: api.query, + query: url.Values{ + "query": []string{"test_metric_replica1"}, + "time": []string{"1970-01-01T01:02:03+01:00"}, + "replicaLabels[]": []string{"replica"}, + }, + response: &queryData{ + ResultType: promql.ValueTypeVector, + Result: promql.Vector{ + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica1", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + }, + }, + }, + // Query endpoint with multiple deduplication label. + { + endpoint: api.query, + query: url.Values{ + "query": []string{"test_metric_replica1"}, + "time": []string{"1970-01-01T01:02:03+01:00"}, + "replicaLabels[]": []string{"replica", "replica1"}, + }, + response: &queryData{ + ResultType: promql.ValueTypeVector, + Result: promql.Vector{ + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + }, + }, + }, { endpoint: api.query, query: url.Values{ @@ -269,6 +625,7 @@ func TestEndpoints(t *testing.T) { response: []string{ "test_metric1", "test_metric2", + "test_metric_replica1", }, }, {