Skip to content

Commit

Permalink
QueryFrontend: fixing bug on cortex cache for range query when using …
Browse files Browse the repository at this point in the history
…dynamic sharding (#5688)

* Fixing bug with cache for QFE not playing well with new flags for dynamic sharding

Signed-off-by: Pedro Tanaka <[email protected]>

* Reproducing bug with cortex cache code with e2e test

Signed-off-by: Pedro Tanaka <[email protected]>

* Clean up

Signed-off-by: Pedro Tanaka <[email protected]>

* Adding changelog entry

Signed-off-by: Pedro Tanaka <[email protected]>

* Fixing linting

Signed-off-by: Pedro Tanaka <[email protected]>

* removing entry from changelog

Signed-off-by: Pedro Tanaka <[email protected]>

Signed-off-by: Pedro Tanaka <[email protected]>
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka authored Sep 14, 2022
1 parent 0392dd7 commit 0d8562b
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 8 deletions.
9 changes: 4 additions & 5 deletions pkg/queryfrontend/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,28 @@ package queryfrontend

import (
"fmt"
"time"

"github.com/thanos-io/thanos/internal/cortex/querier/queryrange"
"github.com/thanos-io/thanos/pkg/compact/downsample"
)

// thanosCacheKeyGenerator is a utility for using split interval when determining cache keys.
type thanosCacheKeyGenerator struct {
interval time.Duration
interval queryrange.IntervalFn
resolutions []int64
}

func newThanosCacheKeyGenerator(interval time.Duration) thanosCacheKeyGenerator {
func newThanosCacheKeyGenerator(intervalFn queryrange.IntervalFn) thanosCacheKeyGenerator {
return thanosCacheKeyGenerator{
interval: interval,
interval: intervalFn,
resolutions: []int64{downsample.ResLevel2, downsample.ResLevel1, downsample.ResLevel0},
}
}

// GenerateCacheKey generates a cache key based on the Request and interval.
// TODO(yeya24): Add other request params as request key.
func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Request) string {
currentInterval := r.GetStart() / t.interval.Milliseconds()
currentInterval := r.GetStart() / t.interval(r).Milliseconds()
switch tr := r.(type) {
case *ThanosQueryRangeRequest:
i := 0
Expand Down
4 changes: 3 additions & 1 deletion pkg/queryfrontend/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package queryfrontend

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"

Expand All @@ -14,7 +15,8 @@ import (
)

func TestGenerateCacheKey(t *testing.T) {
splitter := newThanosCacheKeyGenerator(hour)
intervalFn := func(r queryrange.Request) time.Duration { return hour }
splitter := newThanosCacheKeyGenerator(intervalFn)

for _, tc := range []struct {
name string
Expand Down
5 changes: 3 additions & 2 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func newQueryRangeTripperware(
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
*config.ResultsCacheConfig,
newThanosCacheKeyGenerator(config.SplitQueriesByInterval),
newThanosCacheKeyGenerator(dynamicIntervalFn(config)),
limits,
codec,
queryrange.PrometheusResponseExtractor{},
Expand Down Expand Up @@ -283,10 +283,11 @@ func newLabelsTripperware(
}

if config.ResultsCacheConfig != nil {
staticIntervalFn := func(_ queryrange.Request) time.Duration { return config.SplitQueriesByInterval }
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
*config.ResultsCacheConfig,
newThanosCacheKeyGenerator(config.SplitQueriesByInterval),
newThanosCacheKeyGenerator(staticIntervalFn),
limits,
codec,
ThanosResponseExtractor{},
Expand Down
7 changes: 7 additions & 0 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,13 @@ func NewQueryFrontend(e e2e.Environment, name, downstreamURL string, config quer
flags["--query-frontend.vertical-shards"] = strconv.Itoa(config.NumShards)
}

if config.QueryRangeConfig.MinQuerySplitInterval != 0 {
flags["--query-range.min-split-interval"] = config.QueryRangeConfig.MinQuerySplitInterval.String()
flags["--query-range.max-split-interval"] = config.QueryRangeConfig.MaxQuerySplitInterval.String()
flags["--query-range.horizontal-shards"] = strconv.FormatInt(config.QueryRangeConfig.HorizontalShards, 10)
flags["--query-range.split-interval"] = "0"
}

return e2e.NewInstrumentedRunnable(
e, fmt.Sprintf("query-frontend-%s", name),
).WithPorts(map[string]int{"http": 8080}, "http").Init(
Expand Down
95 changes: 95 additions & 0 deletions test/e2e/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,101 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) {
testutil.Equals(t, resultWithoutSharding, resultWithSharding)
}

func TestRangeQueryDynamicHorizontalSharding(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("e2e-test-query-frontend")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

now := time.Now()

prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "")
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))

querier := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(querier))

inMemoryCacheConfig := queryfrontend.CacheProviderConfig{
Type: queryfrontend.INMEMORY,
Config: queryfrontend.InMemoryResponseCacheConfig{
MaxSizeItems: 1000,
Validity: time.Hour,
},
}

cfg := queryfrontend.Config{
QueryRangeConfig: queryfrontend.QueryRangeConfig{
MinQuerySplitInterval: time.Hour,
MaxQuerySplitInterval: 12 * time.Hour,
HorizontalShards: 4,
SplitQueriesByInterval: 0,
},
}
queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+querier.InternalEndpoint("http"), cfg, inMemoryCacheConfig)
testutil.Ok(t, e2e.StartAndWaitReady(queryFrontend))

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

// Ensure we can get the result from Querier first so that it
// doesn't need to retry when we send queries to the frontend later.
queryAndAssertSeries(t, ctx, querier.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"job": "myself",
"prometheus": "test",
"replica": "0",
},
})

// -- test starts here --
rangeQuery(
t,
ctx,
queryFrontend.Endpoint("http"),
e2ethanos.QueryUpWithoutInstance,
timestamp.FromTime(now.Add(-time.Hour)),
timestamp.FromTime(now.Add(time.Hour)),
14,
promclient.QueryOptions{
Deduplicate: true,
},
func(res model.Matrix) error {
if len(res) == 0 {
return errors.Errorf("expected some results, got nothing")
}
return nil
},
)

testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions(
e2e.Equals(1),
[]string{"thanos_query_frontend_queries_total"},
e2e.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "op", "query_range")),
))

// make sure that we don't break cortex cache code.
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(3), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(0), "cortex_cache_hits_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "querier_cache_added_new_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(3), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(3), "querier_cache_misses_total"))

// Query interval is 2 hours, which is greater than min-slit-interval, query will be broken down into 4 parts
// + rest (of interval)
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(5), "thanos_frontend_split_queries_total"))

testutil.Ok(t, querier.WaitSumMetricsWithOptions(
e2e.Equals(5),
[]string{"http_requests_total"},
e2e.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range")),
))
}

func TestInstantQueryShardingWithRandomData(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 0d8562b

Please sign in to comment.