Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add min_sharding_lookback limits to the frontends #4047

Merged
merged 3 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ The queryrange_config configures the query splitting and caching in the Loki que
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]
# Limit queries that can be sharded.
# Queries with time range that fall between now and now minus the sharding lookback are not sharded.
# Default value is 0s (disable), meaning all queries of all time range are sharded.
# CLI flag: -frontend.min-sharding-lookback
[min_sharding_lookback: <duration> | default = 0s]
# Deprecated: Split queries by day and execute in parallel. Use -querier.split-queries-by-interval instead.
# CLI flag: -querier.split-queries-by-day
[split_queries_by_day: <boolean> | default = false]
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/grafana/loki/pkg/logql"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
Expand All @@ -22,9 +23,11 @@ const (
// Limits extends the cortex limits interface with support for per tenant splitby parameters
type Limits interface {
queryrange.Limits
logql.Limits
QuerySplitDuration(string) time.Duration
MaxQuerySeries(string) int
MaxEntriesLimitPerQuery(string) int
MinShardingLookback(string) time.Duration
}

type limits struct {
Expand Down
37 changes: 26 additions & 11 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package queryrange
import (
"context"
"fmt"
"net/http"
"time"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
Expand All @@ -25,7 +28,7 @@ func NewQueryShardMiddleware(
confs queryrange.ShardingConfigs,
middlewareMetrics *queryrange.InstrumentMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
limits logql.Limits,
limits Limits,
) queryrange.Middleware {

noshards := !hasShards(confs)
Expand All @@ -44,10 +47,15 @@ func NewQueryShardMiddleware(
})

return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return queryrange.MergeMiddlewares(
queryrange.InstrumentMiddleware("shardingware", middlewareMetrics),
mapperware,
).Wrap(next)
return &shardSplitter{
limits: limits,
shardingware: queryrange.MergeMiddlewares(
queryrange.InstrumentMiddleware("shardingware", middlewareMetrics),
mapperware,
).Wrap(next),
now: time.Now,
next: queryrange.InstrumentMiddleware("sharding-bypass", middlewareMetrics).Wrap(next),
}
})
}

Expand Down Expand Up @@ -173,15 +181,22 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
// This is used to send nonsharded requests to the ingesters in order to not overload them.
// TODO(owen-d): export in cortex so we don't duplicate code
type shardSplitter struct {
MinShardingLookback time.Duration // delimiter for splitting sharded vs non-sharded queries
shardingware queryrange.Handler // handler for sharded queries
next queryrange.Handler // handler for non-sharded queries
now func() time.Time // injectable time.Now
limits Limits // delimiter for splitting sharded vs non-sharded queries
shardingware queryrange.Handler // handler for sharded queries
next queryrange.Handler // handler for non-sharded queries
now func() time.Time // injectable time.Now
}

func (splitter *shardSplitter) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
cutoff := splitter.now().Add(-splitter.MinShardingLookback)

userid, err := tenant.TenantID(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
minShardingLookback := splitter.limits.MinShardingLookback(userid)
if minShardingLookback == 0 {
return splitter.shardingware.Do(ctx, r)
}
cutoff := splitter.now().Add(-minShardingLookback)
// Only attempt to shard queries which are older than the sharding lookback (the period for which ingesters are also queried).
if !cutoff.After(util.TimeFromMillis(r.GetEnd())) {
return splitter.next.Do(ctx, r)
Expand Down
16 changes: 11 additions & 5 deletions pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,27 @@ func Test_shardSplitter(t *testing.T) {
lookback: end.Sub(start) + 1, // the entire query is in the ingester range and should avoid sharding.
shouldShard: false,
},
{
desc: "default",
lookback: 0,
shouldShard: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
var didShard bool

splitter := &shardSplitter{
shardingware: queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
didShard = true
return mockHandler(lokiResps[0], nil).Do(ctx, req)
}),
next: mockHandler(lokiResps[1], nil),
now: func() time.Time { return end },
MinShardingLookback: tc.lookback,
next: mockHandler(lokiResps[1], nil),
now: func() time.Time { return end },
limits: fakeLimits{
minShardingLookback: tc.lookback,
},
}

resp, err := splitter.Do(context.Background(), req)
resp, err := splitter.Do(user.InjectOrgID(context.Background(), "1"), req)
require.Nil(t, err)

require.Equal(t, tc.shouldShard, didShard)
Expand Down
6 changes: 5 additions & 1 deletion pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ func TestLogFilterTripperware(t *testing.T) {
}

func TestInstantQueryTripperware(t *testing.T) {

testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 1*time.Second, nil)
Expand Down Expand Up @@ -552,6 +551,7 @@ type fakeLimits struct {
maxEntriesLimitPerQuery int
maxSeries int
splits map[string]time.Duration
minShardingLookback time.Duration
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
Expand Down Expand Up @@ -588,6 +588,10 @@ func (f fakeLimits) MaxQueryLookback(string) time.Duration {
return 0
}

func (f fakeLimits) MinShardingLookback(string) time.Duration {
return f.minShardingLookback
}

func counter() (*int, http.Handler) {
count := 0
var lock sync.Mutex
Expand Down
11 changes: 10 additions & 1 deletion pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type Limits struct {
MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"`

// Query frontend enforced limits. The default is actually parameterized by the queryrange config.
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`

// Ruler defaults and limits.
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
Expand Down Expand Up @@ -115,6 +116,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query")
f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests")

_ = l.MinShardingLookback.Set("0s")
f.Var(&l.MinShardingLookback, "frontend.min-sharding-lookback", "Limit the sharding time range.Queries with time range that fall between now and now minus the sharding lookback are not sharded. 0 to disable.")

_ = l.MaxCacheFreshness.Set("1m")
f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")

Expand Down Expand Up @@ -311,6 +315,11 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery
}

// MinShardingLookback returns the tenant specific min sharding lookback (e.g from when we should start sharding).
func (o *Overrides) MinShardingLookback(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MinShardingLookback)
}

// QuerySplitDuration returns the tenant specific splitby interval applied in the query frontend.
func (o *Overrides) QuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration)
Expand Down