From bf4cdf3c7c93e1e66d7a8104e15f7f9c484e3545 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Wed, 31 Jul 2019 17:18:44 +0300 Subject: [PATCH 01/14] allow multiple deduplication label. Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- cmd/thanos/query.go | 10 +-- pkg/query/iter.go | 10 +-- pkg/query/querier.go | 37 ++++---- pkg/query/querier_test.go | 177 +++++++++++++++++++++++++++----------- 4 files changed, 156 insertions(+), 78 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 3db672a5b8..7ed348731f 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -64,8 +64,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node."). Default("20").Int() - replicaLabel := cmd.Flag("query.replica-label", "Label to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter."). - String() + replicaLabels := cmd.Flag("query.replica-label", "Label to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter."). + Strings() selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated)."). PlaceHolder("=\"\"").Strings() @@ -145,7 +145,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string *maxConcurrentQueries, time.Duration(*queryTimeout), time.Duration(*storeResponseTimeout), - *replicaLabel, + *replicaLabels, selectorLset, *stores, *enableAutodownsampling, @@ -262,7 +262,7 @@ func runQuery( maxConcurrentQueries int, queryTimeout time.Duration, storeResponseTimeout time.Duration, - replicaLabel string, + replicaLabels []string, selectorLset labels.Labels, storeAddrs []string, enableAutodownsampling bool, @@ -309,7 +309,7 @@ func runQuery( unhealthyStoreTimeout, ) proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout) - queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel) + queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabels) engine = promql.NewEngine( promql.EngineOpts{ Logger: logger, diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 0f16185e17..a4ca1d8569 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -292,8 +292,8 @@ func (it *chunkSeriesIterator) Err() error { } type dedupSeriesSet struct { - set storage.SeriesSet - replicaLabel string + set storage.SeriesSet + replicaLabels map[string]struct{} replicas []storage.Series lset labels.Labels @@ -301,8 +301,8 @@ type dedupSeriesSet struct { ok bool } -func newDedupSeriesSet(set storage.SeriesSet, replicaLabel string) storage.SeriesSet { - s := &dedupSeriesSet{set: set, replicaLabel: replicaLabel} +func newDedupSeriesSet(set storage.SeriesSet, replicaLabels map[string]struct{}) storage.SeriesSet { + s := &dedupSeriesSet{set: set, replicaLabels: replicaLabels} s.ok = s.set.Next() if s.ok { s.peek = s.set.At() @@ -325,7 +325,7 @@ func (s *dedupSeriesSet) Next() bool { // replica label if it exists func (s *dedupSeriesSet) peekLset() labels.Labels { lset := s.peek.Labels() - if lset[len(lset)-1].Name != s.replicaLabel { + if _, ok := s.replicaLabels[lset[len(lset)-1].Name]; !ok { return lset } return lset[:len(lset)-1] diff --git a/pkg/query/querier.go b/pkg/query/querier.go index df66401ff0..21acfb04cb 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -21,17 +21,17 @@ import ( type WarningReporter func(error) // QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. -// If deduplication is enabled, all data retrieved from it will be deduplicated along the replicaLabel by default. +// If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default. // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy. type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable // NewQueryableCreator creates QueryableCreator. -func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabel string) QueryableCreator { +func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabels []string) QueryableCreator { return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable { return &queryable{ logger: logger, - replicaLabel: replicaLabel, + replicaLabels: replicaLabels, proxy: proxy, deduplicate: deduplicate, maxResolutionMillis: maxResolutionMillis, @@ -43,7 +43,7 @@ func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLa type queryable struct { logger log.Logger - replicaLabel string + replicaLabels []string proxy storepb.StoreServer deduplicate bool maxResolutionMillis int64 @@ -53,7 +53,7 @@ type queryable struct { // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.warningReporter), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.warningReporter), nil } type querier struct { @@ -61,7 +61,7 @@ type querier struct { logger log.Logger cancel func() mint, maxt int64 - replicaLabel string + replicaLabels map[string]struct{} proxy storepb.StoreServer deduplicate bool maxResolutionMillis int64 @@ -75,7 +75,7 @@ func newQuerier( ctx context.Context, logger log.Logger, mint, maxt int64, - replicaLabel string, + replicaLabels []string, proxy storepb.StoreServer, deduplicate bool, maxResolutionMillis int64, @@ -89,13 +89,18 @@ func newQuerier( warningReporter = func(error) {} } ctx, cancel := context.WithCancel(ctx) + + rl := make(map[string]struct{}) + for _, replicaLabel := range replicaLabels { + rl[replicaLabel] = struct{}{} + } return &querier{ ctx: ctx, logger: logger, cancel: cancel, mint: mint, maxt: maxt, - replicaLabel: replicaLabel, + replicaLabels: rl, proxy: proxy, deduplicate: deduplicate, maxResolutionMillis: maxResolutionMillis, @@ -105,7 +110,7 @@ func newQuerier( } func (q *querier) isDedupEnabled() bool { - return q.deduplicate && q.replicaLabel != "" + return q.deduplicate && len(q.replicaLabels) > 0 } type seriesServer struct { @@ -209,7 +214,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // TODO(fabxc): this could potentially pushed further down into the store API // to make true streaming possible. - sortDedupLabels(resp.seriesSet, q.replicaLabel) + sortDedupLabels(resp.seriesSet, q.replicaLabels) set := promSeriesSet{ mint: q.mint, @@ -221,19 +226,19 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // The merged series set assembles all potentially-overlapping time ranges // of the same series into a single one. The series are ordered so that equal series // from different replicas are sequential. We can now deduplicate those. - return newDedupSeriesSet(set, q.replicaLabel), nil, nil + return newDedupSeriesSet(set, q.replicaLabels), nil, nil } -// sortDedupLabels resorts the set so that the same series with different replica +// sortDedupLabels re-sorts the set so that the same series with different replica // labels are coming right after each other. -func sortDedupLabels(set []storepb.Series, replicaLabel string) { +func sortDedupLabels(set []storepb.Series, replicaLabels map[string]struct{}) { for _, s := range set { - // Move the replica label to the very end. + // Move the replica labels to the very end. sort.Slice(s.Labels, func(i, j int) bool { - if s.Labels[i].Name == replicaLabel { + if _, ok := replicaLabels[s.Labels[i].Name]; ok { return false } - if s.Labels[j].Name == replicaLabel { + if _, ok := replicaLabels[s.Labels[j].Name]; ok { return true } return s.Labels[i].Name < s.Labels[j].Name diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 56c9411631..58a137750b 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -25,7 +25,7 @@ import ( func TestQueryableCreator_MaxResolution(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() testProxy := &storeServer{resps: []*storepb.SeriesResponse{}} - queryableCreator := NewQueryableCreator(nil, testProxy, "test") + queryableCreator := NewQueryableCreator(nil, testProxy, []string{"test"}) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) queryable := queryableCreator(false, oneHourMillis, false, func(err error) {}) @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) { }, } - q := NewQueryableCreator(nil, testProxy, "")(false, 9999999, false, nil) + q := NewQueryableCreator(nil, testProxy, []string{""})(false, 9999999, false, nil) engine := promql.NewEngine( promql.EngineOpts{ @@ -172,7 +172,7 @@ func TestQuerier_Series(t *testing.T) { // Querier clamps the range to [1,300], which should drop some samples of the result above. // The store API allows endpoints to send more data then initially requested. - q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true, nil) + q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true, nil) defer func() { testutil.Ok(t, q.Close()) }() res, _, err := q.Select(&storage.SelectParams{}) @@ -216,56 +216,129 @@ func TestQuerier_Series(t *testing.T) { func TestSortReplicaLabel(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() - set := []storepb.Series{ - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "b", Value: "replica-1"}, - {Name: "c", Value: "3"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "b", Value: "replica-1"}, - {Name: "c", Value: "3"}, - {Name: "d", Value: "4"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "b", Value: "replica-1"}, - {Name: "c", Value: "4"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "b", Value: "replica-2"}, - {Name: "c", Value: "3"}, - }}, + tests := []struct { + Input []storepb.Series + Exp []storepb.Series + DedupLabels map[string]struct{} + }{ + // 0 Single deduplication label. + { + Input: []storepb.Series{ + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "c", Value: "3"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "c", Value: "3"}, + {Name: "d", Value: "4"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "c", Value: "4"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-2"}, + {Name: "c", Value: "3"}, + }}, + }, + Exp: []storepb.Series{ + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-1"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-2"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "d", Value: "4"}, + {Name: "b", Value: "replica-1"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "4"}, + {Name: "b", Value: "replica-1"}, + }}, + }, + DedupLabels: map[string]struct{}{"b": struct{}{}}, + }, + // 1 Multi deduplication labels. + { + Input: []storepb.Series{ + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + {Name: "c", Value: "3"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + {Name: "c", Value: "3"}, + {Name: "d", Value: "4"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + {Name: "c", Value: "4"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-2"}, + {Name: "b1", Value: "replica-2"}, + {Name: "c", Value: "3"}, + }}, + }, + Exp: []storepb.Series{ + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-2"}, + {Name: "b1", Value: "replica-2"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "d", Value: "4"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "4"}, + {Name: "b", Value: "replica-1"}, + {Name: "b1", Value: "replica-1"}, + }}, + }, + DedupLabels: map[string]struct{}{ + "b": struct{}{}, + "b1": struct{}{}, + }, + }, } - - sortDedupLabels(set, "b") - - exp := []storepb.Series{ - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "c", Value: "3"}, - {Name: "b", Value: "replica-1"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "c", Value: "3"}, - {Name: "b", Value: "replica-2"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "c", Value: "3"}, - {Name: "d", Value: "4"}, - {Name: "b", Value: "replica-1"}, - }}, - {Labels: []storepb.Label{ - {Name: "a", Value: "1"}, - {Name: "c", Value: "4"}, - {Name: "b", Value: "replica-1"}, - }}, + for _, test := range tests { + t.Run("", func(t *testing.T) { + sortDedupLabels(test.Input, test.DedupLabels) + testutil.Equals(t, test.Exp, test.Input) + }) } - testutil.Equals(t, exp, set) } func expandSeries(t testing.TB, it storage.SeriesIterator) (res []sample) { @@ -354,7 +427,7 @@ func TestDedupSeriesSet(t *testing.T) { maxt: math.MaxInt64, set: newStoreSeriesSet(series), } - dedupSet := newDedupSeriesSet(set, "replica") + dedupSet := newDedupSeriesSet(set, map[string]struct{}{"replica": struct{}{}}) i := 0 for dedupSet.Next() { From 0837d370133ee6167a40f4569c7e2b4909a09fba Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Wed, 31 Jul 2019 17:33:10 +0300 Subject: [PATCH 02/14] changelog Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- CHANGELOG.md | 2 ++ cmd/thanos/query.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c485a31c4b..aee103667f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Changed +- [#1362](https://github.com/thanos-io/thanos/pull/1362) `query.replica-label` configuration can be provided more than once for multiple deduplication labels like: `--query.replica-label=prometheus_replica --query.replica-label=service`. + - [#1338](https://github.com/thanos-io/thanos/pull/1338) Querier still warns on store API duplicate, but allows a single one from duplicated set. This is gracefully warn about the problematic logic and not disrupt immediately. ### Fixed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 7ed348731f..33aa1a3794 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -64,7 +64,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node."). Default("20").Int() - replicaLabels := cmd.Flag("query.replica-label", "Label to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter."). + replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter."). Strings() selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated)."). From 6ded7bef8213a16b2ba84360e6cab94a4409a5bb Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Wed, 31 Jul 2019 17:37:13 +0300 Subject: [PATCH 03/14] docs Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- docs/components/query.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/components/query.md b/docs/components/query.md index 47e57027f6..9669109603 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -230,8 +230,8 @@ Flags: --query.timeout=2m Maximum time to process query by query node. --query.max-concurrent=20 Maximum number of queries processed concurrently by query node. - --query.replica-label=QUERY.REPLICA-LABEL - Label to treat as a replica indicator along + --query.replica-label=QUERY.REPLICA-LABEL ... + Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. From afac811cb9cffb7e5b7a156a28ba783b97a3d0ce Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Thu, 1 Aug 2019 16:30:39 +0300 Subject: [PATCH 04/14] bug in the replicate labels stripping Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- pkg/query/iter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/query/iter.go b/pkg/query/iter.go index a4ca1d8569..93ea950124 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -328,7 +328,7 @@ func (s *dedupSeriesSet) peekLset() labels.Labels { if _, ok := s.replicaLabels[lset[len(lset)-1].Name]; !ok { return lset } - return lset[:len(lset)-1] + return lset[:len(lset)-len(s.replicaLabels)] } func (s *dedupSeriesSet) next() bool { From 01d227dfde8e14fa973f17af702f3aaba5ba4caa Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Thu, 1 Aug 2019 16:31:39 +0300 Subject: [PATCH 05/14] extended the TestDedupSeriesSet with multi replica labels. Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- pkg/query/querier_test.go | 262 +++++++++++++++++++++++++------------- 1 file changed, 172 insertions(+), 90 deletions(-) diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 58a137750b..424d387c55 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -217,13 +217,13 @@ func TestSortReplicaLabel(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() tests := []struct { - Input []storepb.Series - Exp []storepb.Series - DedupLabels map[string]struct{} + input []storepb.Series + exp []storepb.Series + dedupLabels map[string]struct{} }{ // 0 Single deduplication label. { - Input: []storepb.Series{ + input: []storepb.Series{ {Labels: []storepb.Label{ {Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, @@ -246,7 +246,7 @@ func TestSortReplicaLabel(t *testing.T) { {Name: "c", Value: "3"}, }}, }, - Exp: []storepb.Series{ + exp: []storepb.Series{ {Labels: []storepb.Label{ {Name: "a", Value: "1"}, {Name: "c", Value: "3"}, @@ -269,11 +269,11 @@ func TestSortReplicaLabel(t *testing.T) { {Name: "b", Value: "replica-1"}, }}, }, - DedupLabels: map[string]struct{}{"b": struct{}{}}, + dedupLabels: map[string]struct{}{"b": struct{}{}}, }, // 1 Multi deduplication labels. { - Input: []storepb.Series{ + input: []storepb.Series{ {Labels: []storepb.Label{ {Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, @@ -300,7 +300,7 @@ func TestSortReplicaLabel(t *testing.T) { {Name: "c", Value: "3"}, }}, }, - Exp: []storepb.Series{ + exp: []storepb.Series{ {Labels: []storepb.Label{ {Name: "a", Value: "1"}, {Name: "c", Value: "3"}, @@ -327,7 +327,7 @@ func TestSortReplicaLabel(t *testing.T) { {Name: "b1", Value: "replica-1"}, }}, }, - DedupLabels: map[string]struct{}{ + dedupLabels: map[string]struct{}{ "b": struct{}{}, "b1": struct{}{}, }, @@ -335,8 +335,8 @@ func TestSortReplicaLabel(t *testing.T) { } for _, test := range tests { t.Run("", func(t *testing.T) { - sortDedupLabels(test.Input, test.DedupLabels) - testutil.Equals(t, test.Exp, test.Input) + sortDedupLabels(test.input, test.dedupLabels) + testutil.Equals(t, test.exp, test.input) }) } } @@ -353,91 +353,173 @@ func expandSeries(t testing.TB, it storage.SeriesIterator) (res []sample) { func TestDedupSeriesSet(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() - input := []struct { - lset []storepb.Label - vals []sample - }{ - { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-2"}}, - vals: []sample{{60000, 3}, {70000, 4}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, - vals: []sample{{200000, 5}, {210000, 6}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}, {Name: "replica", Value: "replica-1"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, { - lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, - vals: []sample{{60000, 3}, {70000, 4}}, - }, - } - exp := []struct { - lset labels.Labels - vals []sample + tests := []struct { + input []struct { + lset []storepb.Label + vals []sample + } + exp []struct { + lset labels.Labels + vals []sample + } + dedupLabels map[string]struct{} }{ - { - lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, - vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}, {200000, 5}, {210000, 6}}, - }, - { - lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, - { - lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, - { - lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}}, - vals: []sample{{10000, 1}, {20000, 2}}, - }, - { - lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}}, - vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}}, + { // 0 Single dedup label. + input: []struct { + lset []storepb.Label + vals []sample + }{ + { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-2"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, + vals: []sample{{200000, 5}, {210000, 6}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}, {Name: "replica", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, + }, + exp: []struct { + lset labels.Labels + vals []sample + }{ + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}, {200000, 5}, {210000, 6}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}}, + }, + }, + dedupLabels: map[string]struct{}{ + "replica": struct{}{}, + }, }, - } - var series []storepb.Series - for _, c := range input { - chk := chunkenc.NewXORChunk() - app, _ := chk.Appender() - for _, s := range c.vals { - app.Append(s.t, s.v) - } - series = append(series, storepb.Series{ - Labels: c.lset, - Chunks: []storepb.AggrChunk{ - {Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chk.Bytes()}}, + { // 1 Multi dedup label. + input: []struct { + lset []storepb.Label + vals []sample + }{ + { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-2"}, {Name: "replicaA", Value: "replica-2"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}, {Name: "replicaA", Value: "replica-3"}}, + vals: []sample{{200000, 5}, {210000, 6}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}, {Name: "replicaA", Value: "replica-3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-3"}, {Name: "replicaA", Value: "replica-3"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, }, - }) - } - set := promSeriesSet{ - mint: 1, - maxt: math.MaxInt64, - set: newStoreSeriesSet(series), + exp: []struct { + lset labels.Labels + vals []sample + }{ + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}, {200000, 5}, {210000, 6}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, + { + lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}}, + }, + }, + dedupLabels: map[string]struct{}{ + "replica": struct{}{}, + "replicaA": struct{}{}, + }, + }, } - dedupSet := newDedupSeriesSet(set, map[string]struct{}{"replica": struct{}{}}) - i := 0 - for dedupSet.Next() { - testutil.Equals(t, exp[i].lset, dedupSet.At().Labels()) - - res := expandSeries(t, dedupSet.At().Iterator()) - testutil.Equals(t, exp[i].vals, res) - i++ + for _, test := range tests { + t.Run("", func(t *testing.T) { + var series []storepb.Series + for _, c := range test.input { + chk := chunkenc.NewXORChunk() + app, _ := chk.Appender() + for _, s := range c.vals { + app.Append(s.t, s.v) + } + series = append(series, storepb.Series{ + Labels: c.lset, + Chunks: []storepb.AggrChunk{ + {Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chk.Bytes()}}, + }, + }) + } + set := promSeriesSet{ + mint: 1, + maxt: math.MaxInt64, + set: newStoreSeriesSet(series), + } + dedupSet := newDedupSeriesSet(set, test.dedupLabels) + + i := 0 + for dedupSet.Next() { + testutil.Equals(t, test.exp[i].lset, dedupSet.At().Labels(), "labels mismatch at index:%v", i) + res := expandSeries(t, dedupSet.At().Iterator()) + testutil.Equals(t, test.exp[i].vals, res, "values mismatch at index:%v", i) + i++ + } + testutil.Ok(t, dedupSet.Err()) + }) } - testutil.Ok(t, dedupSet.Err()) } func TestDedupSeriesIterator(t *testing.T) { From a35f3200d0b24d863659569de1b19eafcc2e6708 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Thu, 1 Aug 2019 16:59:47 +0300 Subject: [PATCH 06/14] docs update Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- docs/components/query.md | 24 +++++++++++++++++++++--- docs/getting-started.md | 1 + 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/docs/components/query.md b/docs/components/query.md index 9669109603..039b2ac981 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -23,10 +23,12 @@ $ thanos query \ ## Deduplication The query layer can deduplicate series that were collected from high-availability pairs of data sources such as Prometheus. -A fixed replica label must be chosen for the entire cluster and can then be passed to query nodes on startup. +A fixed single or multiple replica labels must be chosen for the entire cluster and can then be passed to query nodes on startup. Two or more series that are only distinguished by the given replica label, will be merged into a single time series. -This also hides gaps in collection of a single data source. For example: +This also hides gaps in collection of a single data source. + +### An example with a single replica labels: * Prometheus + sidecar "A": `cluster=1,env=2,replica=A` * Prometheus + sidecar "B": `cluster=1,env=2,replica=B` @@ -47,12 +49,28 @@ And we query for metric `up{job="prometheus",env="2"}` with this option we will * `up{job="prometheus",env="2",cluster="1"} 1` * `up{job="prometheus",env="2",cluster="2"} 1` -WITHOUT this replica flag (so deduplication turned off), we will get 3 results: +WITHOUT this replica flag (deduplication turned off), we will get 3 results: * `up{job="prometheus",env="2",cluster="1",replica="A"} 1` * `up{job="prometheus",env="2",cluster="1",replica="B"} 1` * `up{job="prometheus",env="2",cluster="2",replica="A"} 1` +### The same example with multiple replica labels: + +* Prometheus + sidecar "A": `cluster=1,env=2,replica=A,replicaX=A` +* Prometheus + sidecar "B": `cluster=1,env=2,replica=B,replicaX=B` +* Prometheus + sidecar "A" in different cluster: `cluster=2,env=2,replica=A,replicaX=A` + +``` +$ thanos query \ + --http-address "0.0.0.0:9090" \ + --query.replica-label "replica" \ + --query.replica-label "replicaX" \ + --store ":" \ + --store ":" \ +``` + + This logic can also be controlled via parameter on QueryAPI. More details below. ## Query API diff --git a/docs/getting-started.md b/docs/getting-started.md index 8ef0047bda..a6aea81635 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -166,6 +166,7 @@ thanos query \ --store 1.2.3.4:19090 \ --store 1.2.3.5:19090 \ --query.replica-label replica # Replica label for de-duplication + --query.replica-label replicaX # Supports multiple replica labels for de-duplication ``` Go to the configured HTTP address, and you should now be able to query across all Prometheus instances and receive de-duplicated data. From d8d83e0c78b5ac712fc2897176fe4a84598475c3 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Fri, 2 Aug 2019 14:32:40 +0300 Subject: [PATCH 07/14] added replicaLabels to api Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- pkg/query/api/v1.go | 36 +++++++++++++++++++++++++++++++----- pkg/query/api/v1_test.go | 2 +- pkg/query/querier.go | 9 +++++++-- pkg/query/querier_test.go | 4 ++-- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 7ec9e64f03..47c9b44397 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -204,6 +204,17 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool return enableDeduplication, nil } +func (api *API) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *ApiError) { + if err := r.ParseForm(); err != nil { + return nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")} + } + + if len(r.Form["replicaLabels[]"]) > 0 { + replicaLabels = r.Form["match[]"] + } + return replicaLabels, nil +} + func (api *API) parseDownsamplingParamMillis(r *http.Request, step time.Duration) (maxResolutionMillis int64, _ *ApiError) { const maxSourceResolutionParam = "max_source_resolution" maxSourceResolution := 0 * time.Second @@ -274,6 +285,11 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } + replicaLabels, apiErr := api.parseReplicaLabelsParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + enablePartialResponse, apiErr := api.parsePartialResponseParam(r) if apiErr != nil { return nil, nil, apiErr @@ -294,7 +310,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { defer span.Finish() begin := api.now() - qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter), r.FormValue("query"), ts) + qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, warningReporter), r.FormValue("query"), ts) if err != nil { return nil, nil, &ApiError{errorBadData, err} } @@ -367,6 +383,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } + replicaLabels, apiErr := api.parseReplicaLabelsParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step) if apiErr != nil { return nil, nil, apiErr @@ -393,7 +414,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { begin := api.now() qry, err := api.queryEngine.NewRangeQuery( - api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse, warningReporter), + api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, warningReporter), r.FormValue("query"), start, end, @@ -444,7 +465,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) { warnmtx.Unlock() } - q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } @@ -510,6 +531,11 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } + replicaLabels, apiErr := api.parseReplicaLabelsParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + enablePartialResponse, apiErr := api.parsePartialResponseParam(r) if apiErr != nil { return nil, nil, apiErr @@ -526,7 +552,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) { } // TODO(bwplotka): Support downsampling? - q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, warningReporter).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &ApiError{errorExec, err} } @@ -637,7 +663,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { warnmtx.Unlock() } - q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 36fcf06644..e96435ca81 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -44,7 +44,7 @@ import ( ) func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator { - return func(_ bool, _ int64, _ bool, _ query.WarningReporter) storage.Queryable { + return func(_ bool, _ []string, _ int64, _ bool, _ query.WarningReporter) storage.Queryable { return queryable } } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 21acfb04cb..72c42e4d8a 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -22,13 +22,18 @@ type WarningReporter func(error) // QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. // If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default. +// When the replicaLabels argument is not empty it overwrites the global replicaLabels flag. This allows specifing +// replicaLabels at query time. // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy. -type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable +type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable // NewQueryableCreator creates QueryableCreator. func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabels []string) QueryableCreator { - return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable { + return func(deduplicate bool, replicaLabelsOverwrite []string, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable { + if len(replicaLabelsOverwrite) > 0 { + replicaLabels = replicaLabelsOverwrite + } return &queryable{ logger: logger, replicaLabels: replicaLabels, diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 424d387c55..d1e349fe55 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -28,7 +28,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { queryableCreator := NewQueryableCreator(nil, testProxy, []string{"test"}) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) - queryable := queryableCreator(false, oneHourMillis, false, func(err error) {}) + queryable := queryableCreator(false, nil, oneHourMillis, false, func(err error) {}) q, err := queryable.Querier(context.Background(), 0, 42) testutil.Ok(t, err) @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) { }, } - q := NewQueryableCreator(nil, testProxy, []string{""})(false, 9999999, false, nil) + q := NewQueryableCreator(nil, testProxy, []string{""})(false, nil, 9999999, false, nil) engine := promql.NewEngine( promql.EngineOpts{ From 707eca9047d2af489e15162720d8c72818b59f87 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Fri, 2 Aug 2019 15:02:44 +0300 Subject: [PATCH 08/14] updated docs for the API replica labels. Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- CHANGELOG.md | 1 + docs/components/query.md | 11 ++++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aee103667f..3810c1bc97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added - [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type +- [#1362](https://github.com/thanos-io/thanos/pull/1362) Optional `replicaLabels` param for `/query` and `/query_range` querier endpoints. When provided overwrite the `query.replica-label` cli flags. ### Changed diff --git a/docs/components/query.md b/docs/components/query.md index 039b2ac981..b15700ab5e 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -105,6 +105,15 @@ Querier also allows to configure different timeouts: If you prefer availability over accuracy you can set tighter timeout to underlying StoreAPI than overall query timeout. If partial response strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warning with 200 status code response. +### Deduplication replica labels. + +| HTTP URL/FORM parameter | Type | Default | Example | +|----|----|----|----| +| `replicaLabels` | `[]string` | `query.replica-label` flag (default: empty). | `replicaLabels=replicaA&replicaLabels=replicaB` | +| | | | | + +This overwrites the `query.replica-label` cli flag to allow dynamic replica labels at query time. + ### Deduplication Enabled | HTTP URL/FORM parameter | Type | Default | Example | @@ -112,7 +121,7 @@ strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warn | `dedup` | `Boolean` | True, but effect depends on `query.replica` configuration flag. | `1, t, T, TRUE, true, True` for "True" | | | | | | -This controls if query should use `replica` label for deduplication or not. +This controls if query results should be deduplicated using the replica labels. ### Auto downsampling From fccf8c8e671ab25116defd0e34688a4ab68a35e7 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Wed, 28 Aug 2019 17:07:36 +0300 Subject: [PATCH 09/14] missing dedup label from some series Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- pkg/query/iter.go | 12 ++++++++++-- pkg/query/querier.go | 4 ---- pkg/query/querier_test.go | 29 ++++++++++++++++++++++++++++- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 9e7b0e44e7..65d0c1db0f 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -360,10 +360,18 @@ func (s *dedupSeriesSet) Next() bool { // replica label if it exists func (s *dedupSeriesSet) peekLset() labels.Labels { lset := s.peek.Labels() - if _, ok := s.replicaLabels[lset[len(lset)-1].Name]; !ok { + if len(s.replicaLabels) == 0 { return lset } - return lset[:len(lset)-len(s.replicaLabels)] + // Check how many replica labels are present so that these are removed. + var totalToRemove int + for index := 0; index < len(s.replicaLabels); index++ { + if _, ok := s.replicaLabels[lset[len(lset)-index-1].Name]; ok { + totalToRemove++ + } + } + // Strip all present replica labels. + return lset[:len(lset)-totalToRemove] } func (s *dedupSeriesSet) next() bool { diff --git a/pkg/query/querier.go b/pkg/query/querier.go index ada6a88afd..f617b5c9ea 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -215,11 +215,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // The merged series set assembles all potentially-overlapping time ranges // of the same series into a single one. The series are ordered so that equal series // from different replicas are sequential. We can now deduplicate those. -<<<<<<< HEAD return newDedupSeriesSet(set, q.replicaLabels), warns, nil -======= - return newDedupSeriesSet(set, q.replicaLabel), warns, nil ->>>>>>> upstream/master } // sortDedupLabels re-sorts the set so that the same series with different replica diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 894d5c6704..52f22e5b39 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) { }, } - q := NewQueryableCreator(nil, testProxy, "")(false, nil, 9999999, false) + q := NewQueryableCreator(nil, testProxy, []string{""})(false, nil, 9999999, false) engine := promql.NewEngine( promql.EngineOpts{ @@ -489,6 +489,33 @@ func TestDedupSeriesSet(t *testing.T) { "replicaA": struct{}{}, }, }, + { // 2 Multi dedup label - some series don't have all dedup labels. + input: []struct { + lset []storepb.Label + vals []sample + }{ + { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-1"}, {Name: "replicaA", Value: "replica-1"}}, + vals: []sample{{10000, 1}, {20000, 2}}, + }, { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "replica", Value: "replica-2"}}, + vals: []sample{{60000, 3}, {70000, 4}}, + }, + }, + exp: []struct { + lset labels.Labels + vals []sample + }{ + { + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, + vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}}, + }, + }, + dedupLabels: map[string]struct{}{ + "replica": struct{}{}, + "replicaA": struct{}{}, + }, + }, } for _, test := range tests { From ae7faf10292af786318f6dc34ab6c96c8f48d51c Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Wed, 28 Aug 2019 18:18:18 +0300 Subject: [PATCH 10/14] nice review changes Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- cmd/thanos/query.go | 4 ++-- pkg/query/api/v1.go | 22 +++++++++++++++------- pkg/query/querier.go | 7 ++----- pkg/query/querier_test.go | 14 ++++++++++++-- 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4a0d0d4f15..70447602fd 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -313,7 +313,7 @@ func runQuery( unhealthyStoreTimeout, ) proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout) - queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabels) + queryableCreator = query.NewQueryableCreator(logger, proxy) engine = promql.NewEngine( promql.EngineOpts{ Logger: logger, @@ -405,7 +405,7 @@ func runQuery( ins := extpromhttp.NewInstrumentationMiddleware(reg) ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins) - api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, instantDefaultMaxSourceResolution) + api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, replicaLabels, instantDefaultMaxSourceResolution) api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 50aa2ff4e6..bcce34a3ca 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -102,6 +102,7 @@ type API struct { enableAutodownsampling bool enablePartialResponse bool + replicaLabels []string reg prometheus.Registerer defaultInstantQueryMaxSourceResolution time.Duration @@ -116,6 +117,7 @@ func NewAPI( c query.QueryableCreator, enableAutodownsampling bool, enablePartialResponse bool, + replicaLabels []string, defaultInstantQueryMaxSourceResolution time.Duration, ) *API { return &API{ @@ -124,6 +126,7 @@ func NewAPI( queryableCreate: c, enableAutodownsampling: enableAutodownsampling, enablePartialResponse: enablePartialResponse, + replicaLabels: replicaLabels, reg: reg, defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution, @@ -186,13 +189,17 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool } func (api *API) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *ApiError) { + const replicaLabelsParam = "replicLabeals[]" if err := r.ParseForm(); err != nil { return nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")} } - if len(r.Form["replicaLabels[]"]) > 0 { - replicaLabels = r.Form["match[]"] + replicaLabels = api.replicaLabels + // Overwrite the cli flag when provided as a query parameter. + if len(r.Form[replicaLabelsParam]) > 0 { + replicaLabels = r.Form[replicaLabelsParam] } + return replicaLabels, nil } @@ -223,6 +230,7 @@ func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialRespons const partialResponseParam = "partial_response" enablePartialResponse = api.enablePartialResponse + // Overwrite the cli flag when provided as a query parameter. if val := r.FormValue(partialResponseParam); val != "" { var err error enablePartialResponse, err = strconv.ParseBool(val) @@ -285,7 +293,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels,maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts) + qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts) if err != nil { return nil, nil, &ApiError{errorBadData, err} } @@ -378,7 +386,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { defer span.Finish() qry, err := api.queryEngine.NewRangeQuery( - api.queryableCreate(enableDedup,replicaLabels, maxSourceResolution, enablePartialResponse), + api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), start, end, @@ -418,7 +426,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } - q, err := api.queryableCreate(true,nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } @@ -495,7 +503,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) { } // TODO(bwplotka): Support downsampling? - q, err := api.queryableCreate(enableDedup,replicaLabels, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &ApiError{errorExec, err} } @@ -598,7 +606,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } - q, err := api.queryableCreate(true,nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index f617b5c9ea..dcf61b2a7b 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -22,11 +22,8 @@ import ( type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable // NewQueryableCreator creates QueryableCreator. -func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabels []string) QueryableCreator { - return func(deduplicate bool, replicaLabelsOverwrite []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable { - if len(replicaLabelsOverwrite) > 0 { - replicaLabels = replicaLabelsOverwrite - } +func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator { + return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable { return &queryable{ logger: logger, replicaLabels: replicaLabels, diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 52f22e5b39..00d416e88c 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -25,7 +25,7 @@ import ( func TestQueryableCreator_MaxResolution(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() testProxy := &storeServer{resps: []*storepb.SeriesResponse{}} - queryableCreator := NewQueryableCreator(nil, testProxy, []string{"test"}) + queryableCreator := NewQueryableCreator(nil, testProxy) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) queryable := queryableCreator(false, nil, oneHourMillis, false) @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) { }, } - q := NewQueryableCreator(nil, testProxy, []string{""})(false, nil, 9999999, false) + q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false) engine := promql.NewEngine( promql.EngineOpts{ @@ -303,6 +303,11 @@ func TestSortReplicaLabel(t *testing.T) { {Name: "b1", Value: "replica-2"}, {Name: "c", Value: "3"}, }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "replica-2"}, + {Name: "c", Value: "3"}, + }}, }, exp: []storepb.Series{ {Labels: []storepb.Label{ @@ -311,6 +316,11 @@ func TestSortReplicaLabel(t *testing.T) { {Name: "b", Value: "replica-1"}, {Name: "b1", Value: "replica-1"}, }}, + {Labels: []storepb.Label{ + {Name: "a", Value: "1"}, + {Name: "c", Value: "3"}, + {Name: "b", Value: "replica-2"}, + }}, {Labels: []storepb.Label{ {Name: "a", Value: "1"}, {Name: "c", Value: "3"}, From 841b889f25e55d5a0fec144879cee6a84f361f2e Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Thu, 12 Sep 2019 14:42:22 +0300 Subject: [PATCH 11/14] nits Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- docs/components/query.md | 2 +- go.mod | 1 - go.sum | 4 ---- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/docs/components/query.md b/docs/components/query.md index 1128a609ce..107dd03c87 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -55,7 +55,7 @@ WITHOUT this replica flag (deduplication turned off), we will get 3 results: * `up{job="prometheus",env="2",cluster="1",replica="B"} 1` * `up{job="prometheus",env="2",cluster="2",replica="A"} 1` -### The same example with multiple replica labels: +### The same output will be present for this example with multiple replica labels: * Prometheus + sidecar "A": `cluster=1,env=2,replica=A,replicaX=A` * Prometheus + sidecar "B": `cluster=1,env=2,replica=B,replicaX=B` diff --git a/go.mod b/go.mod index abb3e94512..88022eb4d5 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,6 @@ require ( github.com/prometheus/client_golang v1.1.0 github.com/prometheus/common v0.6.0 github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2 // v1.8.2 is misleading as Prometheus does not have v2 module. This is pointing to one commit after 2.12.0. - github.com/prometheus/tsdb v0.10.0 // indirect github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v2.16.0+incompatible github.com/uber/jaeger-lib v2.0.0+incompatible diff --git a/go.sum b/go.sum index 3d24c2c7c8..18caffd34c 100644 --- a/go.sum +++ b/go.sum @@ -68,7 +68,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda h1:NyywMz59neOoVRFDz+ccfKWxn784fiHMDnZSy6T+JXY= github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dgryski/go-sip13 v0.0.0-20190329191031-25c5027a8c7b/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -392,8 +391,6 @@ github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDa github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2 h1:yZAWzfQYJN+vduRHL5jcTrVw+XwYU52ZrAhprmwoknI= github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2/go.mod h1:rMTlmxGCvukf2KMu3fClMDKLLoJ5hl61MhcJ7xKakf0= -github.com/prometheus/tsdb v0.10.0 h1:If5rVCMTp6W2SiRAQFlbpJNgVlgMEd+U2GZckwK38ic= -github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= @@ -443,7 +440,6 @@ go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/automaxprocs v1.2.0 h1:+RUihKM+nmYUoB9w0D0Ov5TJ2PpFO2FgenTxMJiZBZA= go.uber.org/automaxprocs v1.2.0/go.mod h1:YfO3fm683kQpzETxlTGZhGIVmXAhaw3gxeBADbpZtnU= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= From b29289751beadd560e9d2c93ecd9eb2be75aa879 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Thu, 12 Sep 2019 14:42:33 +0300 Subject: [PATCH 12/14] replicaLabels typo Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- pkg/query/api/v1.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index bcce34a3ca..c2e34220a1 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -189,7 +189,7 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool } func (api *API) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *ApiError) { - const replicaLabelsParam = "replicLabeals[]" + const replicaLabelsParam = "replicaLabels[]" if err := r.ParseForm(); err != nil { return nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")} } From 5c4ae8c0b3c6b7507e5184a03b380ca3365e480a Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Fri, 13 Sep 2019 11:38:07 +0300 Subject: [PATCH 13/14] added api replica tests Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- pkg/query/api/v1_test.go | 364 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 360 insertions(+), 4 deletions(-) diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 586a973389..164b101d21 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "math/rand" "net/http" "net/http/httptest" @@ -33,18 +34,164 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/thanos-io/thanos/pkg/compact" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" ) +type testStoreServer struct { + storage.Queryable +} + +func (s *testStoreServer) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { + return nil, errors.New("not implemented") +} + +func (s *testStoreServer) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { + q, err := s.Querier(context.Background(), r.MinTime, r.MaxTime) + if err != nil { + return err + } + defer func() { + if errClose := q.Close(); errClose != nil { + err = errClose + } + }() + + matchers, err := translateMatchers(r.Matchers) + seriesSets, warn, err := q.Select(&storage.SelectParams{}, matchers...) + if len(warn) != 0 { + return fmt.Errorf("querier selection contains warnings: %v", warn) + } + for seriesSets.Next() { + ss := seriesSets.At() + it := ss.Iterator() + samples := make([]sample, 0) + + for it.Next() { + t, v := it.At() + samples = append(samples, sample{t: t, v: v}) + } + if it.Err() != nil { + return it.Err() + } + resp, err := storeSeriesResponse(ss.Labels(), samples) + if err != nil { + return err + } + err = srv.Send(resp) + if err != nil { + return err + } + } + if seriesSets.Err() != nil { + return seriesSets.Err() + } + return nil +} + +func (s *testStoreServer) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64) + if err != nil { + return nil, err + } + names, _, err := q.LabelNames() + if err != nil { + return nil, err + } + if err := q.Close(); err != nil { + return nil, err + } + return &storepb.LabelNamesResponse{Names: names}, nil +} +func (s *testStoreServer) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64) + if err != nil { + return nil, err + } + values, _, err := q.LabelValues(req.Label) + if err != nil { + return nil, err + } + if err := q.Close(); err != nil { + return nil, err + } + return &storepb.LabelValuesResponse{Values: values}, nil +} + +type sample struct { + t int64 + v float64 +} + +func translateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) { + switch m.Type { + case storepb.LabelMatcher_EQ: + return labels.NewMatcher(labels.MatchEqual, m.Name, m.Value) + case storepb.LabelMatcher_NEQ: + return labels.NewMatcher(labels.MatchNotEqual, m.Name, m.Value) + case storepb.LabelMatcher_RE: + return labels.NewMatcher(labels.MatchRegexp, m.Name, m.Value) + case storepb.LabelMatcher_NRE: + return labels.NewMatcher(labels.MatchNotRegexp, m.Name, m.Value) + } + return nil, fmt.Errorf("unknown label matcher type %d", m.Type) +} + +func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err error) { + for _, m := range ms { + r, err := translateMatcher(m) + if err != nil { + return nil, err + } + res = append(res, r) + } + return res, nil +} + +// storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. +func storeSeriesResponse(lset labels.Labels, smplChunks ...[]sample) (*storepb.SeriesResponse, error) { + var s storepb.Series + + for _, l := range lset { + s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value}) + } + + for _, smpls := range smplChunks { + c := chunkenc.NewXORChunk() + a, err := c.Appender() + if err != nil { + return nil, err + } + + for _, smpl := range smpls { + a.Append(smpl.t, smpl.v) + } + + ch := storepb.AggrChunk{ + MinTime: smpls[0].t, + MaxTime: smpls[len(smpls)-1].t, + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, + } + + s.Chunks = append(s.Chunks, ch) + } + return storepb.NewSeriesResponse(&s), nil +} + func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator { - return func(_ bool, _ []string, _ int64, _ bool) storage.Queryable { - return queryable + return func(dedup bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable { + testProxy := &testStoreServer{ + Queryable: queryable, + } + return query.NewQueryableCreator(nil, testProxy)(dedup, replicaLabels, maxResolutionMillis, partialResponse) } } @@ -54,6 +201,10 @@ func TestEndpoints(t *testing.T) { test_metric1{foo="bar"} 0+100x100 test_metric1{foo="boo"} 1+0x100 test_metric2{foo="boo"} 1+0x100 + test_metric_replica1{foo="bar",replica="a"} 1+1x1 + test_metric_replica1{foo="boo",replica="a"} 1+1x1 + test_metric_replica1{foo="boo",replica="b"} 1+1x1 + test_metric_replica1{foo="boo",replica1="a"} 1+1x1 `) if err != nil { t.Fatal(err) @@ -69,8 +220,7 @@ func TestEndpoints(t *testing.T) { api := &API{ queryableCreate: testQueryableCreator(suite.Storage()), queryEngine: suite.QueryEngine(), - - now: func() time.Time { return now }, + now: func() time.Time { return now }, } start := time.Unix(0, 0) @@ -125,6 +275,211 @@ func TestEndpoints(t *testing.T) { }, }, }, + // Query endpoint without deduplication. + { + endpoint: api.query, + query: url.Values{ + "query": []string{"test_metric_replica1"}, + "time": []string{"1970-01-01T01:02:03+01:00"}, + }, + response: &queryData{ + ResultType: promql.ValueTypeVector, + Result: promql.Vector{ + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "bar", + }, + { + Name: "replica", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica", + Value: "b", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica1", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + }, + }, + }, + // Query endpoint with single deduplication label. + { + endpoint: api.query, + query: url.Values{ + "query": []string{"test_metric_replica1"}, + "time": []string{"1970-01-01T01:02:03+01:00"}, + "replicaLabels[]": []string{"replica"}, + }, + response: &queryData{ + ResultType: promql.ValueTypeVector, + Result: promql.Vector{ + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + { + Name: "replica1", + Value: "a", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + }, + }, + }, + // Query endpoint with multiple deduplication label. + { + endpoint: api.query, + query: url.Values{ + "query": []string{"test_metric_replica1"}, + "time": []string{"1970-01-01T01:02:03+01:00"}, + "replicaLabels[]": []string{"replica", "replica1"}, + }, + response: &queryData{ + ResultType: promql.ValueTypeVector, + Result: promql.Vector{ + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + { + Metric: labels.Labels{ + { + Name: "__name__", + Value: "test_metric_replica1", + }, + { + Name: "foo", + Value: "boo", + }, + }, + Point: promql.Point{ + T: 123000, + V: 2, + }, + }, + }, + }, + }, { endpoint: api.query, query: url.Values{ @@ -269,6 +624,7 @@ func TestEndpoints(t *testing.T) { response: []string{ "test_metric1", "test_metric2", + "test_metric_replica1", }, }, { From b8cd889aab458ec0dd5d071cc92bebd9f2c25c9c Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Mon, 16 Sep 2019 12:57:18 +0300 Subject: [PATCH 14/14] use tsdb storage package for the proxy Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- pkg/query/api/v1_test.go | 228 ++++++++++----------------------------- 1 file changed, 57 insertions(+), 171 deletions(-) diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 164b101d21..6c5654b535 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -20,7 +20,6 @@ import ( "fmt" "io" "io/ioutil" - "math" "math/rand" "net/http" "net/http/httptest" @@ -30,197 +29,84 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunkenc" + tsdb_labels "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/component" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/query" - "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/testutil" ) -type testStoreServer struct { - storage.Queryable -} - -func (s *testStoreServer) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { - return nil, errors.New("not implemented") -} - -func (s *testStoreServer) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { - q, err := s.Querier(context.Background(), r.MinTime, r.MaxTime) - if err != nil { - return err - } - defer func() { - if errClose := q.Close(); errClose != nil { - err = errClose - } - }() - - matchers, err := translateMatchers(r.Matchers) - seriesSets, warn, err := q.Select(&storage.SelectParams{}, matchers...) - if len(warn) != 0 { - return fmt.Errorf("querier selection contains warnings: %v", warn) - } - for seriesSets.Next() { - ss := seriesSets.At() - it := ss.Iterator() - samples := make([]sample, 0) - - for it.Next() { - t, v := it.At() - samples = append(samples, sample{t: t, v: v}) - } - if it.Err() != nil { - return it.Err() - } - resp, err := storeSeriesResponse(ss.Labels(), samples) - if err != nil { - return err - } - err = srv.Send(resp) - if err != nil { - return err - } - } - if seriesSets.Err() != nil { - return seriesSets.Err() - } - return nil -} - -func (s *testStoreServer) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { - q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64) - if err != nil { - return nil, err - } - names, _, err := q.LabelNames() - if err != nil { - return nil, err - } - if err := q.Close(); err != nil { - return nil, err - } - return &storepb.LabelNamesResponse{Names: names}, nil -} -func (s *testStoreServer) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { - q, err := s.Querier(context.Background(), math.MinInt64, math.MaxInt64) - if err != nil { - return nil, err - } - values, _, err := q.LabelValues(req.Label) - if err != nil { - return nil, err - } - if err := q.Close(); err != nil { - return nil, err - } - return &storepb.LabelValuesResponse{Values: values}, nil -} - -type sample struct { - t int64 - v float64 -} - -func translateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) { - switch m.Type { - case storepb.LabelMatcher_EQ: - return labels.NewMatcher(labels.MatchEqual, m.Name, m.Value) - case storepb.LabelMatcher_NEQ: - return labels.NewMatcher(labels.MatchNotEqual, m.Name, m.Value) - case storepb.LabelMatcher_RE: - return labels.NewMatcher(labels.MatchRegexp, m.Name, m.Value) - case storepb.LabelMatcher_NRE: - return labels.NewMatcher(labels.MatchNotRegexp, m.Name, m.Value) - } - return nil, fmt.Errorf("unknown label matcher type %d", m.Type) -} - -func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err error) { - for _, m := range ms { - r, err := translateMatcher(m) - if err != nil { - return nil, err - } - res = append(res, r) - } - return res, nil -} - -// storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. -func storeSeriesResponse(lset labels.Labels, smplChunks ...[]sample) (*storepb.SeriesResponse, error) { - var s storepb.Series +func TestEndpoints(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() - for _, l := range lset { - s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value}) + lbls := []tsdb_labels.Labels{ + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric1"}, + tsdb_labels.Label{Name: "foo", Value: "bar"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric2"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "bar"}, + tsdb_labels.Label{Name: "replica", Value: "a"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + tsdb_labels.Label{Name: "replica", Value: "a"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + tsdb_labels.Label{Name: "replica", Value: "b"}, + }, + tsdb_labels.Labels{ + tsdb_labels.Label{Name: "__name__", Value: "test_metric_replica1"}, + tsdb_labels.Label{Name: "foo", Value: "boo"}, + tsdb_labels.Label{Name: "replica1", Value: "a"}, + }, } - for _, smpls := range smplChunks { - c := chunkenc.NewXORChunk() - a, err := c.Appender() - if err != nil { - return nil, err - } - - for _, smpl := range smpls { - a.Append(smpl.t, smpl.v) - } - - ch := storepb.AggrChunk{ - MinTime: smpls[0].t, - MaxTime: smpls[len(smpls)-1].t, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, - } - - s.Chunks = append(s.Chunks, ch) - } - return storepb.NewSeriesResponse(&s), nil -} + db, err := testutil.NewTSDB() + defer func() { testutil.Ok(t, db.Close()) }() + testutil.Ok(t, err) -func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator { - return func(dedup bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable { - testProxy := &testStoreServer{ - Queryable: queryable, + app := db.Appender() + for _, lbl := range lbls { + for i := int64(0); i < 10; i++ { + _, err := app.Add(lbl, i*60000, float64(i)) + testutil.Ok(t, err) } - return query.NewQueryableCreator(nil, testProxy)(dedup, replicaLabels, maxResolutionMillis, partialResponse) - } -} - -func TestEndpoints(t *testing.T) { - suite, err := promql.NewTest(t, ` - load 1m - test_metric1{foo="bar"} 0+100x100 - test_metric1{foo="boo"} 1+0x100 - test_metric2{foo="boo"} 1+0x100 - test_metric_replica1{foo="bar",replica="a"} 1+1x1 - test_metric_replica1{foo="boo",replica="a"} 1+1x1 - test_metric_replica1{foo="boo",replica="b"} 1+1x1 - test_metric_replica1{foo="boo",replica1="a"} 1+1x1 - `) - if err != nil { - t.Fatal(err) - } - defer suite.Close() - - if err := suite.Run(); err != nil { - t.Fatal(err) } + testutil.Ok(t, app.Commit()) now := time.Now() - api := &API{ - queryableCreate: testQueryableCreator(suite.Storage()), - queryEngine: suite.QueryEngine(), - now: func() time.Time { return now }, + queryableCreate: query.NewQueryableCreator(nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)), + queryEngine: promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 20, + MaxSamples: 10000, + Timeout: 100 * time.Second, + }), + now: func() time.Time { return now }, } start := time.Unix(0, 0)