Skip to content

Commit

Permalink
Add min_sharding_lookback limits to the frontends (#4047)
Browse files Browse the repository at this point in the history
* Add min_sharding_lookback limits to the frontends

Signed-off-by: Cyril Tovena <[email protected]>

* improve doc.

Signed-off-by: Cyril Tovena <[email protected]>

* lint

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena committed Jul 26, 2021
1 parent b830d65 commit 388ae97
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 23 deletions.
6 changes: 6 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ The queryrange_config configures the query splitting and caching in the Loki que
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]
# Limit queries that can be sharded.
# Queries with time range that fall between now and now minus the sharding lookback are not sharded.
# Default value is 0s (disable), meaning all queries of all time range are sharded.
# CLI flag: -frontend.min-sharding-lookback
[min_sharding_lookback: <duration> | default = 0s]
# Deprecated: Split queries by day and execute in parallel. Use -querier.split-queries-by-interval instead.
# CLI flag: -querier.split-queries-by-day
[split_queries_by_day: <boolean> | default = false]
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/logql"
)

const (
Expand All @@ -22,9 +24,11 @@ const (
// Limits extends the cortex limits interface with support for per tenant splitby parameters
type Limits interface {
queryrange.Limits
logql.Limits
QuerySplitDuration(string) time.Duration
MaxQuerySeries(string) int
MaxEntriesLimitPerQuery(string) int
MinShardingLookback(string) time.Duration
}

type limits struct {
Expand Down
37 changes: 26 additions & 11 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package queryrange
import (
"context"
"fmt"
"net/http"
"time"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
Expand All @@ -26,7 +29,7 @@ func NewQueryShardMiddleware(
minShardingLookback time.Duration,
middlewareMetrics *queryrange.InstrumentMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
limits logql.Limits,
limits Limits,
) queryrange.Middleware {

noshards := !hasShards(confs)
Expand All @@ -45,10 +48,15 @@ func NewQueryShardMiddleware(
})

return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return queryrange.MergeMiddlewares(
queryrange.InstrumentMiddleware("shardingware", middlewareMetrics),
mapperware,
).Wrap(next)
return &shardSplitter{
limits: limits,
shardingware: queryrange.MergeMiddlewares(
queryrange.InstrumentMiddleware("shardingware", middlewareMetrics),
mapperware,
).Wrap(next),
now: time.Now,
next: queryrange.InstrumentMiddleware("sharding-bypass", middlewareMetrics).Wrap(next),
}
})
}

Expand Down Expand Up @@ -156,15 +164,22 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
// This is used to send nonsharded requests to the ingesters in order to not overload them.
// TODO(owen-d): export in cortex so we don't duplicate code
type shardSplitter struct {
MinShardingLookback time.Duration // delimiter for splitting sharded vs non-sharded queries
shardingware queryrange.Handler // handler for sharded queries
next queryrange.Handler // handler for non-sharded queries
now func() time.Time // injectable time.Now
limits Limits // delimiter for splitting sharded vs non-sharded queries
shardingware queryrange.Handler // handler for sharded queries
next queryrange.Handler // handler for non-sharded queries
now func() time.Time // injectable time.Now
}

func (splitter *shardSplitter) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
cutoff := splitter.now().Add(-splitter.MinShardingLookback)

userid, err := tenant.TenantID(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
minShardingLookback := splitter.limits.MinShardingLookback(userid)
if minShardingLookback == 0 {
return splitter.shardingware.Do(ctx, r)
}
cutoff := splitter.now().Add(-minShardingLookback)
// Only attempt to shard queries which are older than the sharding lookback (the period for which ingesters are also queried).
if !cutoff.After(util.TimeFromMillis(r.GetEnd())) {
return splitter.next.Do(ctx, r)
Expand Down
16 changes: 11 additions & 5 deletions pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,27 @@ func Test_shardSplitter(t *testing.T) {
lookback: end.Sub(start) + 1, // the entire query is in the ingester range and should avoid sharding.
shouldShard: false,
},
{
desc: "default",
lookback: 0,
shouldShard: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
var didShard bool

splitter := &shardSplitter{
shardingware: queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
didShard = true
return mockHandler(lokiResps[0], nil).Do(ctx, req)
}),
next: mockHandler(lokiResps[1], nil),
now: func() time.Time { return end },
MinShardingLookback: tc.lookback,
next: mockHandler(lokiResps[1], nil),
now: func() time.Time { return end },
limits: fakeLimits{
minShardingLookback: tc.lookback,
},
}

resp, err := splitter.Do(context.Background(), req)
resp, err := splitter.Do(user.InjectOrgID(context.Background(), "1"), req)
require.Nil(t, err)

require.Equal(t, tc.shouldShard, didShard)
Expand Down
51 changes: 45 additions & 6 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ var (

// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {

tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
Expand All @@ -101,7 +100,7 @@ func TestMetricsTripperware(t *testing.T) {
lreq := &LokiRequest{
Query: `rate({app="foo"} |= "foo"[1m])`,
Limit: 1000,
Step: 30000, //30sec
Step: 30000, // 30sec
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Direction: logproto.FORWARD,
Expand Down Expand Up @@ -155,7 +154,6 @@ func TestMetricsTripperware(t *testing.T) {
}

func TestLogFilterTripperware(t *testing.T) {

tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
Expand All @@ -182,7 +180,7 @@ func TestLogFilterTripperware(t *testing.T) {
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

//testing limit
// testing limit
count, h := promqlResult(streams)
rt.setHandler(h)
_, err = tpw(rt).RoundTrip(req)
Expand All @@ -202,8 +200,45 @@ func TestLogFilterTripperware(t *testing.T) {
require.Error(t, err)
}

func TestSeriesTripperware(t *testing.T) {
func TestInstantQueryTripperware(t *testing.T) {
testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 1*time.Second, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)
rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()

lreq := &LokiInstantRequest{
Query: `sum by (job) (bytes_rate({cluster="dev-us-central-0"}[15m]))`,
Limit: 1000,
Direction: logproto.FORWARD,
Path: "/loki/api/v1/query",
}

ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)

req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

count, h := promqlResult(vector)
rt.setHandler(h)
resp, err := tpw(rt).RoundTrip(req)
require.Equal(t, 1, *count)
require.NoError(t, err)

lokiResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
require.NoError(t, err)
require.IsType(t, &LokiPromResponse{}, lokiResponse)
}

func TestSeriesTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
Expand Down Expand Up @@ -245,7 +280,6 @@ func TestSeriesTripperware(t *testing.T) {
}

func TestLabelsTripperware(t *testing.T) {

tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
Expand Down Expand Up @@ -495,6 +529,7 @@ type fakeLimits struct {
maxEntriesLimitPerQuery int
maxSeries int
splits map[string]time.Duration
minShardingLookback time.Duration
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
Expand Down Expand Up @@ -531,6 +566,10 @@ func (f fakeLimits) MaxQueryLookback(string) time.Duration {
return 0
}

func (f fakeLimits) MinShardingLookback(string) time.Duration {
return f.minShardingLookback
}

func counter() (*int, http.Handler) {
count := 0
var lock sync.Mutex
Expand Down
11 changes: 10 additions & 1 deletion pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type Limits struct {
MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"`

// Query frontend enforced limits. The default is actually parameterized by the queryrange config.
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`

// Ruler defaults and limits.
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
Expand Down Expand Up @@ -115,6 +116,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query")
f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests")

_ = l.MinShardingLookback.Set("0s")
f.Var(&l.MinShardingLookback, "frontend.min-sharding-lookback", "Limit the sharding time range.Queries with time range that fall between now and now minus the sharding lookback are not sharded. 0 to disable.")

_ = l.MaxCacheFreshness.Set("1m")
f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")

Expand Down Expand Up @@ -311,6 +315,11 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery
}

// MinShardingLookback returns the tenant specific min sharding lookback (e.g from when we should start sharding).
func (o *Overrides) MinShardingLookback(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MinShardingLookback)
}

// QuerySplitDuration returns the tenant specific splitby interval applied in the query frontend.
func (o *Overrides) QuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration)
Expand Down

0 comments on commit 388ae97

Please sign in to comment.