Skip to content

Commit

Permalink
Added no-chunk option in series API (#1904)
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>

add e2e tests

Signed-off-by: yeya24 <[email protected]>

add tests for all stores

Signed-off-by: yeya24 <[email protected]>

add changelog

Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 authored and bwplotka committed Jan 3, 2020
1 parent bb346c0 commit d9764fb
Show file tree
Hide file tree
Showing 18 changed files with 664 additions and 245 deletions.
Empty file removed .dep-finished
Empty file.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts.
- [#1881](https://github.com/thanos-io/thanos/pull/1881) Store Gateway: memcached support for index cache. See [documentation](docs/components/store.md/#index-cache) for further information.
- [#1904](https://github.com/thanos-io/thanos/pull/1904) Add a skip-chunks option in Store Series API to improve the response time of `/api/v1/series` endpoint.

## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03

Expand Down
10 changes: 5 additions & 5 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts)
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand Down Expand Up @@ -386,7 +386,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
defer span.Finish()

qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse),
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -426,7 +426,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -503,7 +503,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse).
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
Expand Down Expand Up @@ -607,7 +607,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ import (
// replicaLabels at query time.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator {
return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable {
return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
}
}
}
Expand All @@ -42,11 +43,12 @@ type queryable struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.skipChunks), nil
}

type querier struct {
Expand All @@ -59,6 +61,7 @@ type querier struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
Expand All @@ -72,6 +75,7 @@ func newQuerier(
deduplicate bool,
maxResolutionMillis int64,
partialResponse bool,
skipChunks bool,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -93,6 +97,7 @@ func newQuerier(
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
}
}

Expand Down Expand Up @@ -185,6 +190,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: queryAggrs,
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
}, resp); err != nil {
return nil, nil, errors.Wrap(err, "proxy Series()")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
queryableCreator := NewQueryableCreator(nil, testProxy)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, nil, oneHourMillis, false)
queryable := queryableCreator(false, nil, oneHourMillis, false, false)

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
Expand All @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
},
}

q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false)
q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false, false)

engine := promql.NewEngine(
promql.EngineOpts{
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestQuerier_Series(t *testing.T) {

// Querier clamps the range to [1,300], which should drop some samples of the result above.
// The store API allows endpoints to send more data then initially requested.
q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true)
q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true, false)
defer func() { testutil.Ok(t, q.Close()) }()

res, _, err := q.Select(&storage.SelectParams{})
Expand Down
13 changes: 9 additions & 4 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,11 +946,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
for set.Next() {
var series storepb.Series

series.Labels, series.Chunks = set.At()

stats.mergedSeriesCount++
stats.mergedChunksCount += len(series.Chunks)
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))

if req.SkipChunks {
series.Labels, _ = set.At()
} else {
series.Labels, series.Chunks = set.At()

stats.mergedChunksCount += len(series.Chunks)
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))
}

if err := srv.Send(storepb.NewSeriesResponse(&series)); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
Expand Down
34 changes: 31 additions & 3 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {

// TODO(bwplotka): Add those test cases to TSDB querier_test.go as well, there are no tests for matching.
for i, tcase := range []struct {
req *storepb.SeriesRequest
expected [][]storepb.Label
req *storepb.SeriesRequest
expected [][]storepb.Label
expectedChunkLen int
}{
{
req: &storepb.SeriesRequest{
Expand All @@ -184,6 +185,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -203,6 +205,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -218,6 +221,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -233,6 +237,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -252,6 +257,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -271,6 +277,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -286,6 +293,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
Expand All @@ -309,6 +317,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -324,6 +333,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -347,6 +357,24 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MaxTime: maxt,
},
},
// Test no-chunk option.
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
SkipChunks: true,
},
expectedChunkLen: 0,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
} {
t.Log("Run ", i)

Expand All @@ -357,7 +385,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {

for i, s := range srv.SeriesSet {
testutil.Equals(t, tcase.expected[i], s.Labels)
testutil.Equals(t, 3, len(s.Chunks))
testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks))
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/store/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,21 @@ func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err er
}
return res, nil
}

// matchersToString converts label matchers to string format.
func matchersToString(ms []storepb.LabelMatcher) (string, error) {
var res string
matchers, err := translateMatchers(ms)
if err != nil {
return "", err
}

for i, m := range matchers {
res += m.String()
if i < len(matchers)-1 {
res += ", "
}
}

return "{" + res + "}", nil
}
86 changes: 86 additions & 0 deletions pkg/store/matchers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package store

import (
"testing"

"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMatchersToString(t *testing.T) {
cases := []struct {
ms []storepb.LabelMatcher
expected string
}{
{
ms: []storepb.LabelMatcher{
{
Name: "__name__",
Type: storepb.LabelMatcher_EQ,
Value: "up",
}},
expected: `{__name__="up"}`,
},
{
ms: []storepb.LabelMatcher{
{
Name: "__name__",
Type: storepb.LabelMatcher_NEQ,
Value: "up",
},
{
Name: "job",
Type: storepb.LabelMatcher_EQ,
Value: "test",
},
},
expected: `{__name__!="up", job="test"}`,
},
{
ms: []storepb.LabelMatcher{
{
Name: "__name__",
Type: storepb.LabelMatcher_EQ,
Value: "up",
},
{
Name: "job",
Type: storepb.LabelMatcher_RE,
Value: "test",
},
},
expected: `{__name__="up", job=~"test"}`,
},
{
ms: []storepb.LabelMatcher{
{
Name: "job",
Type: storepb.LabelMatcher_NRE,
Value: "test",
}},
expected: `{job!~"test"}`,
},
{
ms: []storepb.LabelMatcher{
{
Name: "__name__",
Type: storepb.LabelMatcher_EQ,
Value: "up",
},
{
Name: "__name__",
Type: storepb.LabelMatcher_NEQ,
Value: "up",
},
},
// We cannot use up{__name__!="up"} in this case.
expected: `{__name__="up", __name__!="up"}`,
},
}

for i, c := range cases {
actual, err := matchersToString(c.ms)
testutil.Ok(t, err)
testutil.Assert(t, actual == c.expected, "test case %d failed, expected %s, actual %s", i, c.expected, actual)
}
}
Loading

0 comments on commit d9764fb

Please sign in to comment.