Skip to content

Commit

Permalink
use tsdb storage package for the proxy
Browse files Browse the repository at this point in the history
Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
krasi-georgiev committed Sep 16, 2019
1 parent 5c4ae8c commit b22f7b8
Showing 1 changed file with 58 additions and 171 deletions.
229 changes: 58 additions & 171 deletions pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"net/http/httptest"
Expand All @@ -30,197 +29,85 @@ import (
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_labels "github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/component"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/testutil"
)

type testStoreServer struct {
storage.Queryable
}

func (s *testStoreServer) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
return nil, errors.New("not implemented")
}

func (s *testStoreServer) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) {
q, err := s.Querier(context.Background(), r.MinTime, r.MaxTime)
if err != nil {
return err
}
defer func() {
if errClose := q.Close(); errClose != nil {
err = errClose
}
}()

matchers, err := translateMatchers(r.Matchers)
seriesSets, warn, err := q.Select(&storage.SelectParams{}, matchers...)
if len(warn) != 0 {
return fmt.Errorf("querier selection contains warnings: %v", warn)
}
for seriesSets.Next() {
ss := seriesSets.At()
it := ss.Iterator()
samples := make([]sample, 0)

for it.Next() {
t, v := it.At()
samples = append(samples, sample{t: t, v: v})
}
if it.Err() != nil {
return it.Err()
}
resp, err := storeSeriesResponse(ss.Labels(), samples)
if err != nil {
return err
}
err = srv.Send(resp)
if err != nil {
return err
}
}
if seriesSets.Err() != nil {
return seriesSets.Err()
}
return nil
}

func (s *testStoreServer) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64)
if err != nil {
return nil, err
}
names, _, err := q.LabelNames()
if err != nil {
return nil, err
}
if err := q.Close(); err != nil {
return nil, err
}
return &storepb.LabelNamesResponse{Names: names}, nil
}
func (s *testStoreServer) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64)
if err != nil {
return nil, err
}
values, _, err := q.LabelValues(req.Label)
if err != nil {
return nil, err
}
if err := q.Close(); err != nil {
return nil, err
}
return &storepb.LabelValuesResponse{Values: values}, nil
}

type sample struct {
t int64
v float64
}

func translateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) {
switch m.Type {
case storepb.LabelMatcher_EQ:
return labels.NewMatcher(labels.MatchEqual, m.Name, m.Value)
case storepb.LabelMatcher_NEQ:
return labels.NewMatcher(labels.MatchNotEqual, m.Name, m.Value)
case storepb.LabelMatcher_RE:
return labels.NewMatcher(labels.MatchRegexp, m.Name, m.Value)
case storepb.LabelMatcher_NRE:
return labels.NewMatcher(labels.MatchNotRegexp, m.Name, m.Value)
}
return nil, fmt.Errorf("unknown label matcher type %d", m.Type)
}

func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err error) {
for _, m := range ms {
r, err := translateMatcher(m)
if err != nil {
return nil, err
}
res = append(res, r)
}
return res, nil
}

// storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples.
func storeSeriesResponse(lset labels.Labels, smplChunks ...[]sample) (*storepb.SeriesResponse, error) {
var s storepb.Series
func TestEndpoints(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

for _, l := range lset {
s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value})
lbls := []tsdb_labels.Labels{
tsdb_labels.Labels{
tsdb_labels.Label{Name: "__name__", Value: "test_metric1"},
tsdb_labels.Label{Name: "foo", Value: "bar"},
},
tsdb_labels.Labels{
tsdb_labels.Label{Name: "__name__", Value: "test_metric1"},
tsdb_labels.Label{Name: "foo", Value: "boo"},
},
tsdb_labels.Labels{
tsdb_labels.Label{Name: "__name__", Value: "test_metric2"},
tsdb_labels.Label{Name: "foo", Value: "boo"},
},
tsdb_labels.Labels{
tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"},
tsdb_labels.Label{Name: "foo", Value: "bar"},
tsdb_labels.Label{Name: "replica", Value: "a"},
},
tsdb_labels.Labels{
tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"},
tsdb_labels.Label{Name: "foo", Value: "boo"},
tsdb_labels.Label{Name: "replica", Value: "a"},
},
tsdb_labels.Labels{
tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"},
tsdb_labels.Label{Name: "foo", Value: "boo"},
tsdb_labels.Label{Name: "replica", Value: "b"},
},
tsdb_labels.Labels{
tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"},
tsdb_labels.Label{Name: "foo", Value: "boo"},
tsdb_labels.Label{Name: "replica1", Value: "a"},
},
}

for _, smpls := range smplChunks {
c := chunkenc.NewXORChunk()
a, err := c.Appender()
if err != nil {
return nil, err
}

for _, smpl := range smpls {
a.Append(smpl.t, smpl.v)
}

ch := storepb.AggrChunk{
MinTime: smpls[0].t,
MaxTime: smpls[len(smpls)-1].t,
Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()},
}

s.Chunks = append(s.Chunks, ch)
}
return storepb.NewSeriesResponse(&s), nil
}
db, err := testutil.NewTSDB()
defer func() { testutil.Ok(t, db.Close()) }()
testutil.Ok(t, err)
app := db.Appender()

func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator {
return func(dedup bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable {
testProxy := &testStoreServer{
Queryable: queryable,
for _, lbl := range lbls {
for i := int64(0); i < 10; i++ {
_, err := app.Add(lbl, i*60000, float64(i))
testutil.Ok(t, err)
}
return query.NewQueryableCreator(nil, testProxy)(dedup, replicaLabels, maxResolutionMillis, partialResponse)
}
}

func TestEndpoints(t *testing.T) {
suite, err := promql.NewTest(t, `
load 1m
test_metric1{foo="bar"} 0+100x100
test_metric1{foo="boo"} 1+0x100
test_metric2{foo="boo"} 1+0x100
test_metric_replica1{foo="bar",replica="a"} 1+1x1
test_metric_replica1{foo="boo",replica="a"} 1+1x1
test_metric_replica1{foo="boo",replica="b"} 1+1x1
test_metric_replica1{foo="boo",replica1="a"} 1+1x1
`)
if err != nil {
t.Fatal(err)
}
defer suite.Close()

if err := suite.Run(); err != nil {
t.Fatal(err)
}
testutil.Ok(t, app.Commit())
store := store.NewTSDBStore(nil, nil, db, component.Query, nil)

now := time.Now()

api := &API{
queryableCreate: testQueryableCreator(suite.Storage()),
queryEngine: suite.QueryEngine(),
now: func() time.Time { return now },
queryableCreate: query.NewQueryableCreator(nil, store),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxConcurrent: 20,
MaxSamples: 10000,
Timeout: 100 * time.Second,
}),
now: func() time.Time { return now },
}

start := time.Unix(0, 0)
Expand Down

0 comments on commit b22f7b8

Please sign in to comment.