Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: Enforce limits for Push requests coming via ingest.Store #7621

Merged
merged 3 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 56 additions & 20 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,66 +891,97 @@ type pushStats struct {
perMetricSeriesLimitCount int
}

type ctxKey int

var pushReqCtxKey ctxKey = 1

type pushRequestState struct {
requestSize int64
}

// StartPushRequest checks if ingester can start push request, and increments relevant counters.
// If new push request cannot be started, errors converible to gRPC status code are returned, and metrics are updated.
//
// This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. from gRPC server's method limiter.
// If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated.
func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) {
ctx, _, err := i.startPushRequest(ctx, reqSize)
return ctx, err
}

func (i *Ingester) FinishPushRequest(ctx context.Context) {
st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState)
if !ok {
return
}
i.finishPushRequest(st.requestSize)
}

// This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. Ingester.StartPushRequest via gRPC server's method limiter.
//
// In the first case, returned errors can be inspected/logged by middleware. Ingester.PushWithCleanup will wrap the error in util_log.DoNotLogError wrapper.
//
// In the second case, returned errors will not be logged, because request will not reach any middleware.
func (i *Ingester) StartPushRequest(requestSize int64) error {
//
// The shouldFinish flag tells if the caller must call finish on this request. If not, there is already someone in the call stack who will do that.
func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (_ context.Context, shouldFinish bool, err error) {
if err := i.checkAvailable(); err != nil {
return err
return nil, false, err
}

if _, ok := ctx.Value(pushReqCtxKey).(*pushRequestState); ok {
// If state is already in context, this means we already passed through StartPushRequest for this request.
return ctx, false, nil
}
st := &pushRequestState{
requestSize: reqSize,
}
ctx = context.WithValue(ctx, pushReqCtxKey, st)

inflight := i.inflightPushRequests.Inc()
inflightBytes := int64(0)
rejectEqualInflightBytes := false
if requestSize > 0 {
inflightBytes = i.inflightPushRequestsBytes.Add(requestSize)
if reqSize > 0 {
inflightBytes = i.inflightPushRequestsBytes.Add(reqSize)
} else {
inflightBytes = i.inflightPushRequestsBytes.Load()
rejectEqualInflightBytes = true // if inflightBytes == limit, reject new request
}

finishRequestInDefer := true

defer func() {
if finishRequestInDefer {
i.FinishPushRequest(requestSize)
i.finishPushRequest(reqSize)
}
}()

il := i.getInstanceLimits()
if il != nil {
if il.MaxInflightPushRequests > 0 && inflight > il.MaxInflightPushRequests {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequests).Inc()
return errMaxInflightRequestsReached
return nil, false, errMaxInflightRequestsReached
}

if il.MaxInflightPushRequestsBytes > 0 {
if (rejectEqualInflightBytes && inflightBytes >= il.MaxInflightPushRequestsBytes) || inflightBytes > il.MaxInflightPushRequestsBytes {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequestsBytes).Inc()
return errMaxInflightRequestsBytesReached
return nil, false, errMaxInflightRequestsBytesReached
}
}

if il.MaxIngestionRate > 0 {
if rate := i.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxIngestionRate).Inc()
return errMaxIngestionRateReached
return nil, false, errMaxIngestionRateReached
}
}
}

finishRequestInDefer = false
return nil
return ctx, true, nil
}

func (i *Ingester) FinishPushRequest(requestSize int64) {
func (i *Ingester) finishPushRequest(reqSize int64) {
i.inflightPushRequests.Dec()
if requestSize > 0 {
i.inflightPushRequestsBytes.Sub(requestSize)
if reqSize > 0 {
i.inflightPushRequestsBytes.Sub(reqSize)
}
}

Expand All @@ -960,13 +991,18 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
// retain anything from `req` past the exit from this function.
defer cleanUp()

// If we're using grpc handlers, we don't need to start/finish request here.
if !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter {
// Only start/finish request here when the request comes NOT from grpc handlers (i.e., from ingest.Store).
// NOTE: request coming from grpc handler may end up calling start multiple times during its lifetime (e.g., when migrating to ingest storage).
// startPushRequest handles this.
if i.cfg.IngestStorageConfig.Enabled || !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter {
reqSize := int64(req.Size())
if err := i.StartPushRequest(reqSize); err != nil {
_, shouldFinish, err := i.startPushRequest(ctx, reqSize)
if err != nil {
return middleware.DoNotLogError{Err: err}
}
defer i.FinishPushRequest(reqSize)
if shouldFinish {
defer i.finishPushRequest(reqSize)
}
}

userID, err := tenant.TenantID(ctx)
Expand Down
151 changes: 89 additions & 62 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6821,93 +6821,122 @@ func TestIngester_inflightPushRequests(t *testing.T) {
t.Run(fmt.Sprintf("gRPC limit enabled: %t", grpcLimitEnabled), func(t *testing.T) {
limits := InstanceLimits{MaxInflightPushRequests: 1}

// Create a mocked ingester
cfg := defaultIngesterTestConfig(t)
cfg.LimitInflightRequestsUsingGrpcMethodLimiter = grpcLimitEnabled
cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits }

// Create a mocked ingester
reg := prometheus.NewPedanticRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until the ingester is healthy
test.Poll(t, 100*time.Millisecond, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})
testIngesterInflightPushRequests(t, i, reg, grpcLimitEnabled)
})
}

ctx := user.InjectOrgID(context.Background(), "test")
t.Run("gRPC limit enabled with ingest storage enabled", func(t *testing.T) {
limits := InstanceLimits{MaxInflightPushRequests: 1}

startCh := make(chan struct{})
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

const targetRequestDuration = time.Second
cfg := defaultIngesterTestConfig(t)
cfg.LimitInflightRequestsUsingGrpcMethodLimiter = true
cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits }

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
req := prepareRequestForTargetRequestDuration(ctx, t, i, targetRequestDuration)
reg := prometheus.NewPedanticRegistry()
i, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, reg)

// Signal that we're going to do the real push now.
close(startCh)
// Re-enable push gRPC method to simulate migration period, when ingester can receive requests from gRPC
i.cfg.PushGrpcMethodEnabled = true

var err error
testIngesterInflightPushRequests(t, i, reg, cfg.LimitInflightRequestsUsingGrpcMethodLimiter)
})
}

if grpcLimitEnabled {
_, err = pushWithSimulatedGRPCHandler(ctx, i, req)
} else {
_, err = i.Push(ctx, req)
}
func testIngesterInflightPushRequests(t *testing.T, i *Ingester, reg prometheus.Gatherer, grpcLimitEnabled bool) {
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
t.Cleanup(func() {
services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
})

return err
})
// Wait until the ingester is healthy
test.Poll(t, 100*time.Millisecond, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})

g.Go(func() error {
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)
ctx := user.InjectOrgID(context.Background(), "test")

select {
case <-ctx.Done():
// failed to setup
case <-startCh:
// we can start the test.
}
startCh := make(chan struct{})

test.Poll(t, targetRequestDuration/3, int64(1), func() interface{} {
return i.inflightPushRequests.Load()
})
const targetRequestDuration = time.Second

if grpcLimitEnabled {
_, err := pushWithSimulatedGRPCHandler(ctx, i, req)
require.ErrorIs(t, err, errMaxInflightRequestsReached)
} else {
_, err := i.Push(ctx, req)
require.ErrorIs(t, err, errMaxInflightRequestsReached)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
req := prepareRequestForTargetRequestDuration(ctx, t, i, targetRequestDuration)

var optional middleware.OptionalLogging
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()")
// Signal that we're going to do the real push now.
close(startCh)

s, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
}
var err error

return nil
})
if grpcLimitEnabled {
_, err = pushWithSimulatedGRPCHandler(ctx, i, req)
} else {
_, err = i.Push(ctx, req)
}

require.NoError(t, g.Wait())
return err
})

// Ensure the rejected request has been tracked in a metric.
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
g.Go(func() error {
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)

select {
case <-ctx.Done():
// failed to setup
case <-startCh:
// we can start the test.
}

test.Poll(t, targetRequestDuration/3, int64(1), func() interface{} {
return i.inflightPushRequests.Load()
})

if grpcLimitEnabled {
_, err := pushWithSimulatedGRPCHandler(ctx, i, req)
require.ErrorIs(t, err, errMaxInflightRequestsReached)
} else {
_, err := i.Push(ctx, req)
require.ErrorIs(t, err, errMaxInflightRequestsReached)

var optional middleware.OptionalLogging
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()")

s, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
}

return nil
})

require.NoError(t, g.Wait())

// Ensure the rejected request has been tracked in a metric.
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingester_inflight_push_requests Current number of inflight push requests in ingester.
# TYPE cortex_ingester_inflight_push_requests gauge
cortex_ingester_inflight_push_requests 0
# HELP cortex_ingester_instance_rejected_requests_total Requests rejected for hitting per-instance limits
# TYPE cortex_ingester_instance_rejected_requests_total counter
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_inflight_push_requests"} 1
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_inflight_push_requests_bytes"} 0
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_ingestion_rate"} 0
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_series"} 0
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_tenants"} 0
`), "cortex_ingester_instance_rejected_requests_total"))
})
}
`), "cortex_ingester_instance_rejected_requests_total", "cortex_ingester_inflight_push_requests"))
}

func TestIngester_inflightPushRequestsBytes(t *testing.T) {
Expand Down Expand Up @@ -6989,11 +7018,11 @@ func TestIngester_inflightPushRequestsBytes(t *testing.T) {
`, requestSize)), "cortex_ingester_inflight_push_requests_bytes"))

// Starting push request fails
err = i.StartPushRequest(100)
_, err = i.StartPushRequest(ctx, 100)
require.ErrorIs(t, err, errMaxInflightRequestsBytesReached)

// Starting push request with unknown size fails
err = i.StartPushRequest(0)
_, err = i.StartPushRequest(ctx, 0)
require.ErrorIs(t, err, errMaxInflightRequestsBytesReached)

// Sending push request fails
Expand Down Expand Up @@ -10039,13 +10068,11 @@ func (c *mockContext) Value(key any) interface{} {
}

func pushWithSimulatedGRPCHandler(ctx context.Context, i *Ingester, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
err := i.StartPushRequest(int64(req.Size()))
ctx, err := i.StartPushRequest(ctx, int64(req.Size()))
if err != nil {
return nil, err
}
defer i.FinishPushRequest(ctx)

res, err := i.Push(ctx, req)
i.FinishPushRequest(int64(req.Size()))

return res, err
return i.Push(ctx, req)
}
14 changes: 5 additions & 9 deletions pkg/mimir/grpc_push_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
)

type ingesterPushReceiver interface {
StartPushRequest(requestSize int64) error
FinishPushRequest(requestSize int64)
StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error)
FinishPushRequest(ctx context.Context)
}
Comment on lines 20 to 23
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a nice side effect of these changes, is that we don't need two interfaces ingesterPushReceiver and distributorPushReceiver any more. Here I kept them in place, but want to refactor this chunk further later.


// Interface exposed by Distributor.
Expand All @@ -43,8 +43,7 @@ type grpcInflightMethodLimiter struct {
type ctxKey int

const (
pushTypeCtxKey ctxKey = 1 // ingester or distributor push
ingesterPushRequestSizeCtxKey ctxKey = 2
pushTypeCtxKey ctxKey = 1 // ingester or distributor push

pushTypeIngester = 1
pushTypeDistributor = 2
Expand All @@ -66,12 +65,11 @@ func (g *grpcInflightMethodLimiter) RPCCallStarting(ctx context.Context, methodN

reqSize := getMessageSize(md, grpcutil.MetadataMessageSize)

err := ing.StartPushRequest(reqSize)
ctx, err := ing.StartPushRequest(ctx, reqSize)
if err != nil {
return ctx, status.Error(codes.Unavailable, err.Error())
}

ctx = context.WithValue(ctx, ingesterPushRequestSizeCtxKey, reqSize)
return context.WithValue(ctx, pushTypeCtxKey, pushTypeIngester), nil
}

Expand Down Expand Up @@ -102,9 +100,7 @@ func (g *grpcInflightMethodLimiter) RPCCallFinished(ctx context.Context) {
if pt, ok := ctx.Value(pushTypeCtxKey).(int); ok {
switch pt {
case pushTypeIngester:
// Using two-outputs here to avoid panics, if value is not of int64 type. reqSize will be 0 in that case, which is fine.
reqSize, _ := ctx.Value(ingesterPushRequestSizeCtxKey).(int64)
g.getIngester().FinishPushRequest(reqSize)
g.getIngester().FinishPushRequest(ctx)

case pushTypeDistributor:
g.getDistributor().FinishPushRequest(ctx)
Expand Down
Loading
Loading