Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryFrontend: fixing bug on cortex cache for range query when using dynamic sharding #5688

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
## Unreleased

### Fixed
- [#5688](https://github.com/thanos-io/thanos/pull/5688) Query Frontend: Fix bug with dynamic horizontal sharding parameters and cache key generator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the feature is not yet released, I don't think we need a changelog entry.

- [#5642](https://github.com/thanos-io/thanos/pull/5642) Receive: Log labels correctly in writer debug messages.
- [#5655](https://github.com/thanos-io/thanos/pull/5655) Receive: Fix recreating already pruned tenants.

Expand Down
9 changes: 4 additions & 5 deletions pkg/queryfrontend/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,28 @@ package queryfrontend

import (
"fmt"
pedro-stanaka marked this conversation as resolved.
Show resolved Hide resolved
"time"

"github.com/thanos-io/thanos/internal/cortex/querier/queryrange"
"github.com/thanos-io/thanos/pkg/compact/downsample"
)

// thanosCacheKeyGenerator is a utility for using split interval when determining cache keys.
type thanosCacheKeyGenerator struct {
interval time.Duration
interval queryrange.IntervalFn
resolutions []int64
}

func newThanosCacheKeyGenerator(interval time.Duration) thanosCacheKeyGenerator {
func newThanosCacheKeyGenerator(intervalFn queryrange.IntervalFn) thanosCacheKeyGenerator {
return thanosCacheKeyGenerator{
interval: interval,
interval: intervalFn,
resolutions: []int64{downsample.ResLevel2, downsample.ResLevel1, downsample.ResLevel0},
}
}

// GenerateCacheKey generates a cache key based on the Request and interval.
// TODO(yeya24): Add other request params as request key.
func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Request) string {
currentInterval := r.GetStart() / t.interval.Milliseconds()
currentInterval := r.GetStart() / t.interval(r).Milliseconds()
switch tr := r.(type) {
case *ThanosQueryRangeRequest:
i := 0
Expand Down
4 changes: 3 additions & 1 deletion pkg/queryfrontend/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package queryfrontend

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"

Expand All @@ -14,7 +15,8 @@ import (
)

func TestGenerateCacheKey(t *testing.T) {
splitter := newThanosCacheKeyGenerator(hour)
intervalFn := func(r queryrange.Request) time.Duration { return hour }
splitter := newThanosCacheKeyGenerator(intervalFn)

for _, tc := range []struct {
name string
Expand Down
5 changes: 3 additions & 2 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func newQueryRangeTripperware(
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
*config.ResultsCacheConfig,
newThanosCacheKeyGenerator(config.SplitQueriesByInterval),
newThanosCacheKeyGenerator(dynamicIntervalFn(config)),
limits,
codec,
queryrange.PrometheusResponseExtractor{},
Expand Down Expand Up @@ -283,10 +283,11 @@ func newLabelsTripperware(
}

if config.ResultsCacheConfig != nil {
staticIntervalFn := func(_ queryrange.Request) time.Duration { return config.SplitQueriesByInterval }
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
*config.ResultsCacheConfig,
newThanosCacheKeyGenerator(config.SplitQueriesByInterval),
newThanosCacheKeyGenerator(staticIntervalFn),
limits,
codec,
ThanosResponseExtractor{},
Expand Down
7 changes: 7 additions & 0 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,13 @@ func NewQueryFrontend(e e2e.Environment, name, downstreamURL string, config quer
flags["--query-frontend.vertical-shards"] = strconv.Itoa(config.NumShards)
}

if config.QueryRangeConfig.MinQuerySplitInterval != 0 {
flags["--query-range.min-split-interval"] = config.QueryRangeConfig.MinQuerySplitInterval.String()
flags["--query-range.max-split-interval"] = config.QueryRangeConfig.MaxQuerySplitInterval.String()
flags["--query-range.horizontal-shards"] = strconv.FormatInt(config.QueryRangeConfig.HorizontalShards, 10)
flags["--query-range.split-interval"] = "0"
}

return e2e.NewInstrumentedRunnable(
e, fmt.Sprintf("query-frontend-%s", name),
).WithPorts(map[string]int{"http": 8080}, "http").Init(
Expand Down
95 changes: 95 additions & 0 deletions test/e2e/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,101 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) {
testutil.Equals(t, resultWithoutSharding, resultWithSharding)
}

func TestRangeQueryDynamicHorizontalSharding(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("e2e-test-query-frontend")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

now := time.Now()

prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "")
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))

querier := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(querier))

inMemoryCacheConfig := queryfrontend.CacheProviderConfig{
Type: queryfrontend.INMEMORY,
Config: queryfrontend.InMemoryResponseCacheConfig{
MaxSizeItems: 1000,
Validity: time.Hour,
},
}

cfg := queryfrontend.Config{
QueryRangeConfig: queryfrontend.QueryRangeConfig{
MinQuerySplitInterval: time.Hour,
MaxQuerySplitInterval: 12 * time.Hour,
HorizontalShards: 4,
SplitQueriesByInterval: 0,
},
}
queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+querier.InternalEndpoint("http"), cfg, inMemoryCacheConfig)
testutil.Ok(t, e2e.StartAndWaitReady(queryFrontend))

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

// Ensure we can get the result from Querier first so that it
// doesn't need to retry when we send queries to the frontend later.
queryAndAssertSeries(t, ctx, querier.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"job": "myself",
"prometheus": "test",
"replica": "0",
},
})

// -- test starts here --
rangeQuery(
t,
ctx,
queryFrontend.Endpoint("http"),
e2ethanos.QueryUpWithoutInstance,
timestamp.FromTime(now.Add(-time.Hour)),
timestamp.FromTime(now.Add(time.Hour)),
14,
promclient.QueryOptions{
Deduplicate: true,
},
func(res model.Matrix) error {
if len(res) == 0 {
return errors.Errorf("expected some results, got nothing")
}
return nil
},
)

testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions(
e2e.Equals(1),
[]string{"thanos_query_frontend_queries_total"},
e2e.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "op", "query_range")),
))

// make sure that we don't break cortex cache code.
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(3), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(0), "cortex_cache_hits_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "querier_cache_added_new_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(3), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(3), "querier_cache_misses_total"))

// Query interval is 2 hours, which is greater than min-slit-interval, query will be broken down into 4 parts
// + rest (of interval)
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(5), "thanos_frontend_split_queries_total"))

testutil.Ok(t, querier.WaitSumMetricsWithOptions(
e2e.Equals(5),
[]string{"http_requests_total"},
e2e.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range")),
))
}

func TestInstantQueryShardingWithRandomData(t *testing.T) {
t.Parallel()

Expand Down