From 34f3aed28eba87c054f6b09151004a700af012fe Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Mon, 16 Sep 2019 12:57:18 +0300 Subject: [PATCH] use tsdb storage package for the proxy Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- pkg/query/api/v1_test.go | 228 ++++++++++----------------------------- 1 file changed, 58 insertions(+), 170 deletions(-) diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 164b101d215..d6005d2bd82 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -20,7 +20,6 @@ import ( "fmt" "io" "io/ioutil" - "math" "math/rand" "net/http" "net/http/httptest" @@ -30,197 +29,86 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" + tsdb_labels "github.com/prometheus/prometheus/tsdb/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/component" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/query" - "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/testutil" ) -type testStoreServer struct { - storage.Queryable -} - -func (s *testStoreServer) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { - return nil, errors.New("not implemented") -} - -func (s *testStoreServer) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { - q, err := s.Querier(context.Background(), r.MinTime, r.MaxTime) - if err != nil { - return err - } - defer func() { - if errClose := q.Close(); errClose != nil { - err = errClose - } - }() - - matchers, err := translateMatchers(r.Matchers) - seriesSets, warn, err := q.Select(&storage.SelectParams{}, matchers...) - if len(warn) != 0 { - return fmt.Errorf("querier selection contains warnings: %v", warn) - } - for seriesSets.Next() { - ss := seriesSets.At() - it := ss.Iterator() - samples := make([]sample, 0) - - for it.Next() { - t, v := it.At() - samples = append(samples, sample{t: t, v: v}) - } - if it.Err() != nil { - return it.Err() - } - resp, err := storeSeriesResponse(ss.Labels(), samples) - if err != nil { - return err - } - err = srv.Send(resp) - if err != nil { - return err - } - } - if seriesSets.Err() != nil { - return seriesSets.Err() - } - return nil -} - -func (s *testStoreServer) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { - q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64) - if err != nil { - return nil, err - } - names, _, err := q.LabelNames() - if err != nil { - return nil, err - } - if err := q.Close(); err != nil { - return nil, err - } - return &storepb.LabelNamesResponse{Names: names}, nil -} -func (s *testStoreServer) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { - q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64) - if err != nil { - return nil, err - } - values, _, err := q.LabelValues(req.Label) - if err != nil { - return nil, err - } - if err := q.Close(); err != nil { - return nil, err - } - return &storepb.LabelValuesResponse{Values: values}, nil -} - -type sample struct { - t int64 - v float64 -} - -func translateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) { - switch m.Type { - case storepb.LabelMatcher_EQ: - return labels.NewMatcher(labels.MatchEqual, m.Name, m.Value) - case storepb.LabelMatcher_NEQ: - return labels.NewMatcher(labels.MatchNotEqual, m.Name, m.Value) - case storepb.LabelMatcher_RE: - return labels.NewMatcher(labels.MatchRegexp, m.Name, m.Value) - case storepb.LabelMatcher_NRE: - return labels.NewMatcher(labels.MatchNotRegexp, m.Name, m.Value) - } - return nil, fmt.Errorf("unknown label matcher type %d", m.Type) -} - -func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err error) { - for _, m := range ms { - r, err := translateMatcher(m) - if err != nil { - return nil, err - } - res = append(res, r) - } - return res, nil -} - -// storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. -func storeSeriesResponse(lset labels.Labels, smplChunks ...[]sample) (*storepb.SeriesResponse, error) { - var s storepb.Series +func TestEndpoints(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() - for _, l := range lset { - s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value}) + lbls := []tsdb_labels.Labels{ + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric1"}, + tsdb_labels.Label{Name: "foo", Value: "bar"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric2"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "bar"}, + tsdb_labels.Label{Name: "replica", Value: "a"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + tsdb_labels.Label{Name: "replica", Value: "a"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + tsdb_labels.Label{Name: "replica", Value: "b"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + tsdb_labels.Label{Name: "replica1", Value: "a"}, + }, } - for _, smpls := range smplChunks { - c := chunkenc.NewXORChunk() - a, err := c.Appender() - if err != nil { - return nil, err - } - - for _, smpl := range smpls { - a.Append(smpl.t, smpl.v) - } - - ch := storepb.AggrChunk{ - MinTime: smpls[0].t, - MaxTime: smpls[len(smpls)-1].t, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, - } - - s.Chunks = append(s.Chunks, ch) - } - return storepb.NewSeriesResponse(&s), nil -} + db, err := testutil.NewTSDB() + defer func() { testutil.Ok(t, db.Close()) }() + testutil.Ok(t, err) + app := db.Appender() -func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator { - return func(dedup bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable { - testProxy := &testStoreServer{ - Queryable: queryable, + for _, lbl := range lbls { + for i := int64(0); i < 10; i++ { + _, err := app.Add(lbl, i*60000, float64(i)) + testutil.Ok(t, err) } - return query.NewQueryableCreator(nil, testProxy)(dedup, replicaLabels, maxResolutionMillis, partialResponse) - } -} - -func TestEndpoints(t *testing.T) { - suite, err := promql.NewTest(t, ` - load 1m - test_metric1{foo="bar"} 0+100x100 - test_metric1{foo="boo"} 1+0x100 - test_metric2{foo="boo"} 1+0x100 - test_metric_replica1{foo="bar",replica="a"} 1+1x1 - test_metric_replica1{foo="boo",replica="a"} 1+1x1 - test_metric_replica1{foo="boo",replica="b"} 1+1x1 - test_metric_replica1{foo="boo",replica1="a"} 1+1x1 - `) - if err != nil { - t.Fatal(err) - } - defer suite.Close() - - if err := suite.Run(); err != nil { - t.Fatal(err) } + testutil.Ok(t, app.Commit()) + store := store.NewTSDBStore(nil, nil, db, component.Query, nil) now := time.Now() - api := &API{ - queryableCreate: testQueryableCreator(suite.Storage()), - queryEngine: suite.QueryEngine(), - now: func() time.Time { return now }, + queryableCreate: query.NewQueryableCreator(nil, store), + queryEngine: promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 20, + MaxSamples: 10000, + Timeout: 100 * time.Second, + }), + now: func() time.Time { return now }, } start := time.Unix(0, 0)