Skip to content


ingester: Enforce limits for Push requests coming via ingest.Store (#…
Browse files Browse the repository at this point in the history

* ingester: enforce limits for Push requests coming via ingest.Store

Signed-off-by: Vladimir Varankin <[email protected]>

* ingester: don't over-count on calls to finish

Signed-off-by: Vladimir Varankin <[email protected]>

* ingester: improve phrasing in the comments

Signed-off-by: Vladimir Varankin <[email protected]>


Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo authored Mar 15, 2024
1 parent a1dab9e commit 44e3ebc
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 95 deletions.
76 changes: 56 additions & 20 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,66 +891,97 @@ type pushStats struct {
perMetricSeriesLimitCount int

type ctxKey int

var pushReqCtxKey ctxKey = 1

type pushRequestState struct {
requestSize int64

// StartPushRequest checks if ingester can start push request, and increments relevant counters.
// If new push request cannot be started, errors converible to gRPC status code are returned, and metrics are updated.
// This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. from gRPC server's method limiter.
// If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated.
func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) {
ctx, _, err := i.startPushRequest(ctx, reqSize)
return ctx, err

func (i *Ingester) FinishPushRequest(ctx context.Context) {
st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState)
if !ok {

// This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. Ingester.StartPushRequest via gRPC server's method limiter.
// In the first case, returned errors can be inspected/logged by middleware. Ingester.PushWithCleanup will wrap the error in util_log.DoNotLogError wrapper.
// In the second case, returned errors will not be logged, because request will not reach any middleware.
func (i *Ingester) StartPushRequest(requestSize int64) error {
// The shouldFinish flag tells if the caller must call finish on this request. If not, there is already someone in the call stack who will do that.
func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (_ context.Context, shouldFinish bool, err error) {
if err := i.checkAvailable(); err != nil {
return err
return nil, false, err

if _, ok := ctx.Value(pushReqCtxKey).(*pushRequestState); ok {
// If state is already in context, this means we already passed through StartPushRequest for this request.
return ctx, false, nil
st := &pushRequestState{
requestSize: reqSize,
ctx = context.WithValue(ctx, pushReqCtxKey, st)

inflight := i.inflightPushRequests.Inc()
inflightBytes := int64(0)
rejectEqualInflightBytes := false
if requestSize > 0 {
inflightBytes = i.inflightPushRequestsBytes.Add(requestSize)
if reqSize > 0 {
inflightBytes = i.inflightPushRequestsBytes.Add(reqSize)
} else {
inflightBytes = i.inflightPushRequestsBytes.Load()
rejectEqualInflightBytes = true // if inflightBytes == limit, reject new request

finishRequestInDefer := true

defer func() {
if finishRequestInDefer {

il := i.getInstanceLimits()
if il != nil {
if il.MaxInflightPushRequests > 0 && inflight > il.MaxInflightPushRequests {
return errMaxInflightRequestsReached
return nil, false, errMaxInflightRequestsReached

if il.MaxInflightPushRequestsBytes > 0 {
if (rejectEqualInflightBytes && inflightBytes >= il.MaxInflightPushRequestsBytes) || inflightBytes > il.MaxInflightPushRequestsBytes {
return errMaxInflightRequestsBytesReached
return nil, false, errMaxInflightRequestsBytesReached

if il.MaxIngestionRate > 0 {
if rate := i.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
return errMaxIngestionRateReached
return nil, false, errMaxIngestionRateReached

finishRequestInDefer = false
return nil
return ctx, true, nil

func (i *Ingester) FinishPushRequest(requestSize int64) {
func (i *Ingester) finishPushRequest(reqSize int64) {
if requestSize > 0 {
if reqSize > 0 {

Expand All @@ -960,13 +991,18 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
// retain anything from `req` past the exit from this function.
defer cleanUp()

// If we're using grpc handlers, we don't need to start/finish request here.
if !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter {
// Only start/finish request here when the request comes NOT from grpc handlers (i.e., from ingest.Store).
// NOTE: request coming from grpc handler may end up calling start multiple times during its lifetime (e.g., when migrating to ingest storage).
// startPushRequest handles this.
if i.cfg.IngestStorageConfig.Enabled || !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter {
reqSize := int64(req.Size())
if err := i.StartPushRequest(reqSize); err != nil {
_, shouldFinish, err := i.startPushRequest(ctx, reqSize)
if err != nil {
return middleware.DoNotLogError{Err: err}
defer i.FinishPushRequest(reqSize)
if shouldFinish {
defer i.finishPushRequest(reqSize)

userID, err := tenant.TenantID(ctx)
Expand Down
151 changes: 89 additions & 62 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6821,93 +6821,122 @@ func TestIngester_inflightPushRequests(t *testing.T) {
t.Run(fmt.Sprintf("gRPC limit enabled: %t", grpcLimitEnabled), func(t *testing.T) {
limits := InstanceLimits{MaxInflightPushRequests: 1}

// Create a mocked ingester
cfg := defaultIngesterTestConfig(t)
cfg.LimitInflightRequestsUsingGrpcMethodLimiter = grpcLimitEnabled
cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits }

// Create a mocked ingester
reg := prometheus.NewPedanticRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until the ingester is healthy
test.Poll(t, 100*time.Millisecond, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
testIngesterInflightPushRequests(t, i, reg, grpcLimitEnabled)

ctx := user.InjectOrgID(context.Background(), "test")
t.Run("gRPC limit enabled with ingest storage enabled", func(t *testing.T) {
limits := InstanceLimits{MaxInflightPushRequests: 1}

startCh := make(chan struct{})
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

const targetRequestDuration = time.Second
cfg := defaultIngesterTestConfig(t)
cfg.LimitInflightRequestsUsingGrpcMethodLimiter = true
cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits }

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
req := prepareRequestForTargetRequestDuration(ctx, t, i, targetRequestDuration)
reg := prometheus.NewPedanticRegistry()
i, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, reg)

// Signal that we're going to do the real push now.
// Re-enable push gRPC method to simulate migration period, when ingester can receive requests from gRPC
i.cfg.PushGrpcMethodEnabled = true

var err error
testIngesterInflightPushRequests(t, i, reg, cfg.LimitInflightRequestsUsingGrpcMethodLimiter)

if grpcLimitEnabled {
_, err = pushWithSimulatedGRPCHandler(ctx, i, req)
} else {
_, err = i.Push(ctx, req)
func testIngesterInflightPushRequests(t *testing.T, i *Ingester, reg prometheus.Gatherer, grpcLimitEnabled bool) {
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
t.Cleanup(func() {
services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

return err
// Wait until the ingester is healthy
test.Poll(t, 100*time.Millisecond, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()

g.Go(func() error {
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)
ctx := user.InjectOrgID(context.Background(), "test")

select {
case <-ctx.Done():
// failed to setup
case <-startCh:
// we can start the test.
startCh := make(chan struct{})

test.Poll(t, targetRequestDuration/3, int64(1), func() interface{} {
return i.inflightPushRequests.Load()
const targetRequestDuration = time.Second

if grpcLimitEnabled {
_, err := pushWithSimulatedGRPCHandler(ctx, i, req)
require.ErrorIs(t, err, errMaxInflightRequestsReached)
} else {
_, err := i.Push(ctx, req)
require.ErrorIs(t, err, errMaxInflightRequestsReached)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
req := prepareRequestForTargetRequestDuration(ctx, t, i, targetRequestDuration)

var optional middleware.OptionalLogging
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()")
// Signal that we're going to do the real push now.

s, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
var err error

return nil
if grpcLimitEnabled {
_, err = pushWithSimulatedGRPCHandler(ctx, i, req)
} else {
_, err = i.Push(ctx, req)

require.NoError(t, g.Wait())
return err

// Ensure the rejected request has been tracked in a metric.
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
g.Go(func() error {
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)

select {
case <-ctx.Done():
// failed to setup
case <-startCh:
// we can start the test.

test.Poll(t, targetRequestDuration/3, int64(1), func() interface{} {
return i.inflightPushRequests.Load()

if grpcLimitEnabled {
_, err := pushWithSimulatedGRPCHandler(ctx, i, req)
require.ErrorIs(t, err, errMaxInflightRequestsReached)
} else {
_, err := i.Push(ctx, req)
require.ErrorIs(t, err, errMaxInflightRequestsReached)

var optional middleware.OptionalLogging
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()")

s, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())

return nil

require.NoError(t, g.Wait())

// Ensure the rejected request has been tracked in a metric.
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingester_inflight_push_requests Current number of inflight push requests in ingester.
# TYPE cortex_ingester_inflight_push_requests gauge
cortex_ingester_inflight_push_requests 0
# HELP cortex_ingester_instance_rejected_requests_total Requests rejected for hitting per-instance limits
# TYPE cortex_ingester_instance_rejected_requests_total counter
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_inflight_push_requests"} 1
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_inflight_push_requests_bytes"} 0
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_ingestion_rate"} 0
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_series"} 0
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_tenants"} 0
`), "cortex_ingester_instance_rejected_requests_total"))
`), "cortex_ingester_instance_rejected_requests_total", "cortex_ingester_inflight_push_requests"))

func TestIngester_inflightPushRequestsBytes(t *testing.T) {
Expand Down Expand Up @@ -6989,11 +7018,11 @@ func TestIngester_inflightPushRequestsBytes(t *testing.T) {
`, requestSize)), "cortex_ingester_inflight_push_requests_bytes"))

// Starting push request fails
err = i.StartPushRequest(100)
_, err = i.StartPushRequest(ctx, 100)
require.ErrorIs(t, err, errMaxInflightRequestsBytesReached)

// Starting push request with unknown size fails
err = i.StartPushRequest(0)
_, err = i.StartPushRequest(ctx, 0)
require.ErrorIs(t, err, errMaxInflightRequestsBytesReached)

// Sending push request fails
Expand Down Expand Up @@ -10039,13 +10068,11 @@ func (c *mockContext) Value(key any) interface{} {

func pushWithSimulatedGRPCHandler(ctx context.Context, i *Ingester, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
err := i.StartPushRequest(int64(req.Size()))
ctx, err := i.StartPushRequest(ctx, int64(req.Size()))
if err != nil {
return nil, err
defer i.FinishPushRequest(ctx)

res, err := i.Push(ctx, req)

return res, err
return i.Push(ctx, req)
14 changes: 5 additions & 9 deletions pkg/mimir/grpc_push_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (

type ingesterPushReceiver interface {
StartPushRequest(requestSize int64) error
FinishPushRequest(requestSize int64)
StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error)
FinishPushRequest(ctx context.Context)

// Interface exposed by Distributor.
Expand All @@ -43,8 +43,7 @@ type grpcInflightMethodLimiter struct {
type ctxKey int

const (
pushTypeCtxKey ctxKey = 1 // ingester or distributor push
ingesterPushRequestSizeCtxKey ctxKey = 2
pushTypeCtxKey ctxKey = 1 // ingester or distributor push

pushTypeIngester = 1
pushTypeDistributor = 2
Expand All @@ -66,12 +65,11 @@ func (g *grpcInflightMethodLimiter) RPCCallStarting(ctx context.Context, methodN

reqSize := getMessageSize(md, grpcutil.MetadataMessageSize)

err := ing.StartPushRequest(reqSize)
ctx, err := ing.StartPushRequest(ctx, reqSize)
if err != nil {
return ctx, status.Error(codes.Unavailable, err.Error())

ctx = context.WithValue(ctx, ingesterPushRequestSizeCtxKey, reqSize)
return context.WithValue(ctx, pushTypeCtxKey, pushTypeIngester), nil

Expand Down Expand Up @@ -102,9 +100,7 @@ func (g *grpcInflightMethodLimiter) RPCCallFinished(ctx context.Context) {
if pt, ok := ctx.Value(pushTypeCtxKey).(int); ok {
switch pt {
case pushTypeIngester:
// Using two-outputs here to avoid panics, if value is not of int64 type. reqSize will be 0 in that case, which is fine.
reqSize, _ := ctx.Value(ingesterPushRequestSizeCtxKey).(int64)

case pushTypeDistributor:
Expand Down

0 comments on commit 44e3ebc

Please sign in to comment.