From 44e3ebcf55afc0c8f29bc8babce32d1d439f1725 Mon Sep 17 00:00:00 2001 From: Vladimir Varankin Date: Fri, 15 Mar 2024 16:29:40 +0100 Subject: [PATCH] ingester: Enforce limits for Push requests coming via ingest.Store (#7621) * ingester: enforce limits for Push requests coming via ingest.Store Signed-off-by: Vladimir Varankin * ingester: don't over-count on calls to finish Signed-off-by: Vladimir Varankin * ingester: improve phrasing in the comments Signed-off-by: Vladimir Varankin --------- Signed-off-by: Vladimir Varankin --- pkg/ingester/ingester.go | 76 +++++++++++---- pkg/ingester/ingester_test.go | 151 ++++++++++++++++++------------ pkg/mimir/grpc_push_check.go | 14 +-- pkg/mimir/grpc_push_check_test.go | 11 ++- 4 files changed, 157 insertions(+), 95 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1c5033551e9..51fea5b2b4b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -891,33 +891,64 @@ type pushStats struct { perMetricSeriesLimitCount int } +type ctxKey int + +var pushReqCtxKey ctxKey = 1 + +type pushRequestState struct { + requestSize int64 +} + // StartPushRequest checks if ingester can start push request, and increments relevant counters. -// If new push request cannot be started, errors converible to gRPC status code are returned, and metrics are updated. -// -// This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. from gRPC server's method limiter. +// If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated. +func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) { + ctx, _, err := i.startPushRequest(ctx, reqSize) + return ctx, err +} + +func (i *Ingester) FinishPushRequest(ctx context.Context) { + st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState) + if !ok { + return + } + i.finishPushRequest(st.requestSize) +} + +// This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. Ingester.StartPushRequest via gRPC server's method limiter. // // In the first case, returned errors can be inspected/logged by middleware. Ingester.PushWithCleanup will wrap the error in util_log.DoNotLogError wrapper. -// // In the second case, returned errors will not be logged, because request will not reach any middleware. -func (i *Ingester) StartPushRequest(requestSize int64) error { +// +// The shouldFinish flag tells if the caller must call finish on this request. If not, there is already someone in the call stack who will do that. +func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (_ context.Context, shouldFinish bool, err error) { if err := i.checkAvailable(); err != nil { - return err + return nil, false, err } + if _, ok := ctx.Value(pushReqCtxKey).(*pushRequestState); ok { + // If state is already in context, this means we already passed through StartPushRequest for this request. + return ctx, false, nil + } + st := &pushRequestState{ + requestSize: reqSize, + } + ctx = context.WithValue(ctx, pushReqCtxKey, st) + inflight := i.inflightPushRequests.Inc() inflightBytes := int64(0) rejectEqualInflightBytes := false - if requestSize > 0 { - inflightBytes = i.inflightPushRequestsBytes.Add(requestSize) + if reqSize > 0 { + inflightBytes = i.inflightPushRequestsBytes.Add(reqSize) } else { inflightBytes = i.inflightPushRequestsBytes.Load() rejectEqualInflightBytes = true // if inflightBytes == limit, reject new request } finishRequestInDefer := true + defer func() { if finishRequestInDefer { - i.FinishPushRequest(requestSize) + i.finishPushRequest(reqSize) } }() @@ -925,32 +956,32 @@ func (i *Ingester) StartPushRequest(requestSize int64) error { if il != nil { if il.MaxInflightPushRequests > 0 && inflight > il.MaxInflightPushRequests { i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequests).Inc() - return errMaxInflightRequestsReached + return nil, false, errMaxInflightRequestsReached } if il.MaxInflightPushRequestsBytes > 0 { if (rejectEqualInflightBytes && inflightBytes >= il.MaxInflightPushRequestsBytes) || inflightBytes > il.MaxInflightPushRequestsBytes { i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequestsBytes).Inc() - return errMaxInflightRequestsBytesReached + return nil, false, errMaxInflightRequestsBytesReached } } if il.MaxIngestionRate > 0 { if rate := i.ingestionRate.Rate(); rate >= il.MaxIngestionRate { i.metrics.rejected.WithLabelValues(reasonIngesterMaxIngestionRate).Inc() - return errMaxIngestionRateReached + return nil, false, errMaxIngestionRateReached } } } finishRequestInDefer = false - return nil + return ctx, true, nil } -func (i *Ingester) FinishPushRequest(requestSize int64) { +func (i *Ingester) finishPushRequest(reqSize int64) { i.inflightPushRequests.Dec() - if requestSize > 0 { - i.inflightPushRequestsBytes.Sub(requestSize) + if reqSize > 0 { + i.inflightPushRequestsBytes.Sub(reqSize) } } @@ -960,13 +991,18 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques // retain anything from `req` past the exit from this function. defer cleanUp() - // If we're using grpc handlers, we don't need to start/finish request here. - if !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter { + // Only start/finish request here when the request comes NOT from grpc handlers (i.e., from ingest.Store). + // NOTE: request coming from grpc handler may end up calling start multiple times during its lifetime (e.g., when migrating to ingest storage). + // startPushRequest handles this. + if i.cfg.IngestStorageConfig.Enabled || !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter { reqSize := int64(req.Size()) - if err := i.StartPushRequest(reqSize); err != nil { + _, shouldFinish, err := i.startPushRequest(ctx, reqSize) + if err != nil { return middleware.DoNotLogError{Err: err} } - defer i.FinishPushRequest(reqSize) + if shouldFinish { + defer i.finishPushRequest(reqSize) + } } userID, err := tenant.TenantID(ctx) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 85e872be87d..ef022e1da0c 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -6821,83 +6821,114 @@ func TestIngester_inflightPushRequests(t *testing.T) { t.Run(fmt.Sprintf("gRPC limit enabled: %t", grpcLimitEnabled), func(t *testing.T) { limits := InstanceLimits{MaxInflightPushRequests: 1} - // Create a mocked ingester cfg := defaultIngesterTestConfig(t) cfg.LimitInflightRequestsUsingGrpcMethodLimiter = grpcLimitEnabled cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits } + // Create a mocked ingester reg := prometheus.NewPedanticRegistry() i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, reg) require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck - // Wait until the ingester is healthy - test.Poll(t, 100*time.Millisecond, 1, func() interface{} { - return i.lifecycler.HealthyInstancesCount() - }) + testIngesterInflightPushRequests(t, i, reg, grpcLimitEnabled) + }) + } - ctx := user.InjectOrgID(context.Background(), "test") + t.Run("gRPC limit enabled with ingest storage enabled", func(t *testing.T) { + limits := InstanceLimits{MaxInflightPushRequests: 1} - startCh := make(chan struct{}) + overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) - const targetRequestDuration = time.Second + cfg := defaultIngesterTestConfig(t) + cfg.LimitInflightRequestsUsingGrpcMethodLimiter = true + cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits } - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - req := prepareRequestForTargetRequestDuration(ctx, t, i, targetRequestDuration) + reg := prometheus.NewPedanticRegistry() + i, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, reg) - // Signal that we're going to do the real push now. - close(startCh) + // Re-enable push gRPC method to simulate migration period, when ingester can receive requests from gRPC + i.cfg.PushGrpcMethodEnabled = true - var err error + testIngesterInflightPushRequests(t, i, reg, cfg.LimitInflightRequestsUsingGrpcMethodLimiter) + }) +} - if grpcLimitEnabled { - _, err = pushWithSimulatedGRPCHandler(ctx, i, req) - } else { - _, err = i.Push(ctx, req) - } +func testIngesterInflightPushRequests(t *testing.T, i *Ingester, reg prometheus.Gatherer, grpcLimitEnabled bool) { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + t.Cleanup(func() { + services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + }) - return err - }) + // Wait until the ingester is healthy + test.Poll(t, 100*time.Millisecond, 1, func() interface{} { + return i.lifecycler.HealthyInstancesCount() + }) - g.Go(func() error { - req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024) + ctx := user.InjectOrgID(context.Background(), "test") - select { - case <-ctx.Done(): - // failed to setup - case <-startCh: - // we can start the test. - } + startCh := make(chan struct{}) - test.Poll(t, targetRequestDuration/3, int64(1), func() interface{} { - return i.inflightPushRequests.Load() - }) + const targetRequestDuration = time.Second - if grpcLimitEnabled { - _, err := pushWithSimulatedGRPCHandler(ctx, i, req) - require.ErrorIs(t, err, errMaxInflightRequestsReached) - } else { - _, err := i.Push(ctx, req) - require.ErrorIs(t, err, errMaxInflightRequestsReached) + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + req := prepareRequestForTargetRequestDuration(ctx, t, i, targetRequestDuration) - var optional middleware.OptionalLogging - require.ErrorAs(t, err, &optional) - require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()") + // Signal that we're going to do the real push now. + close(startCh) - s, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok, "expected to be able to convert to gRPC status") - require.Equal(t, codes.Unavailable, s.Code()) - } + var err error - return nil - }) + if grpcLimitEnabled { + _, err = pushWithSimulatedGRPCHandler(ctx, i, req) + } else { + _, err = i.Push(ctx, req) + } - require.NoError(t, g.Wait()) + return err + }) - // Ensure the rejected request has been tracked in a metric. - require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + g.Go(func() error { + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024) + + select { + case <-ctx.Done(): + // failed to setup + case <-startCh: + // we can start the test. + } + + test.Poll(t, targetRequestDuration/3, int64(1), func() interface{} { + return i.inflightPushRequests.Load() + }) + + if grpcLimitEnabled { + _, err := pushWithSimulatedGRPCHandler(ctx, i, req) + require.ErrorIs(t, err, errMaxInflightRequestsReached) + } else { + _, err := i.Push(ctx, req) + require.ErrorIs(t, err, errMaxInflightRequestsReached) + + var optional middleware.OptionalLogging + require.ErrorAs(t, err, &optional) + require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()") + + s, ok := grpcutil.ErrorToStatus(err) + require.True(t, ok, "expected to be able to convert to gRPC status") + require.Equal(t, codes.Unavailable, s.Code()) + } + + return nil + }) + + require.NoError(t, g.Wait()) + + // Ensure the rejected request has been tracked in a metric. + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingester_inflight_push_requests Current number of inflight push requests in ingester. + # TYPE cortex_ingester_inflight_push_requests gauge + cortex_ingester_inflight_push_requests 0 # HELP cortex_ingester_instance_rejected_requests_total Requests rejected for hitting per-instance limits # TYPE cortex_ingester_instance_rejected_requests_total counter cortex_ingester_instance_rejected_requests_total{reason="ingester_max_inflight_push_requests"} 1 @@ -6905,9 +6936,7 @@ func TestIngester_inflightPushRequests(t *testing.T) { cortex_ingester_instance_rejected_requests_total{reason="ingester_max_ingestion_rate"} 0 cortex_ingester_instance_rejected_requests_total{reason="ingester_max_series"} 0 cortex_ingester_instance_rejected_requests_total{reason="ingester_max_tenants"} 0 - `), "cortex_ingester_instance_rejected_requests_total")) - }) - } + `), "cortex_ingester_instance_rejected_requests_total", "cortex_ingester_inflight_push_requests")) } func TestIngester_inflightPushRequestsBytes(t *testing.T) { @@ -6989,11 +7018,11 @@ func TestIngester_inflightPushRequestsBytes(t *testing.T) { `, requestSize)), "cortex_ingester_inflight_push_requests_bytes")) // Starting push request fails - err = i.StartPushRequest(100) + _, err = i.StartPushRequest(ctx, 100) require.ErrorIs(t, err, errMaxInflightRequestsBytesReached) // Starting push request with unknown size fails - err = i.StartPushRequest(0) + _, err = i.StartPushRequest(ctx, 0) require.ErrorIs(t, err, errMaxInflightRequestsBytesReached) // Sending push request fails @@ -10039,13 +10068,11 @@ func (c *mockContext) Value(key any) interface{} { } func pushWithSimulatedGRPCHandler(ctx context.Context, i *Ingester, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { - err := i.StartPushRequest(int64(req.Size())) + ctx, err := i.StartPushRequest(ctx, int64(req.Size())) if err != nil { return nil, err } + defer i.FinishPushRequest(ctx) - res, err := i.Push(ctx, req) - i.FinishPushRequest(int64(req.Size())) - - return res, err + return i.Push(ctx, req) } diff --git a/pkg/mimir/grpc_push_check.go b/pkg/mimir/grpc_push_check.go index 4905f7ed938..17fe313e61a 100644 --- a/pkg/mimir/grpc_push_check.go +++ b/pkg/mimir/grpc_push_check.go @@ -18,8 +18,8 @@ import ( ) type ingesterPushReceiver interface { - StartPushRequest(requestSize int64) error - FinishPushRequest(requestSize int64) + StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error) + FinishPushRequest(ctx context.Context) } // Interface exposed by Distributor. @@ -43,8 +43,7 @@ type grpcInflightMethodLimiter struct { type ctxKey int const ( - pushTypeCtxKey ctxKey = 1 // ingester or distributor push - ingesterPushRequestSizeCtxKey ctxKey = 2 + pushTypeCtxKey ctxKey = 1 // ingester or distributor push pushTypeIngester = 1 pushTypeDistributor = 2 @@ -66,12 +65,11 @@ func (g *grpcInflightMethodLimiter) RPCCallStarting(ctx context.Context, methodN reqSize := getMessageSize(md, grpcutil.MetadataMessageSize) - err := ing.StartPushRequest(reqSize) + ctx, err := ing.StartPushRequest(ctx, reqSize) if err != nil { return ctx, status.Error(codes.Unavailable, err.Error()) } - ctx = context.WithValue(ctx, ingesterPushRequestSizeCtxKey, reqSize) return context.WithValue(ctx, pushTypeCtxKey, pushTypeIngester), nil } @@ -102,9 +100,7 @@ func (g *grpcInflightMethodLimiter) RPCCallFinished(ctx context.Context) { if pt, ok := ctx.Value(pushTypeCtxKey).(int); ok { switch pt { case pushTypeIngester: - // Using two-outputs here to avoid panics, if value is not of int64 type. reqSize will be 0 in that case, which is fine. - reqSize, _ := ctx.Value(ingesterPushRequestSizeCtxKey).(int64) - g.getIngester().FinishPushRequest(reqSize) + g.getIngester().FinishPushRequest(ctx) case pushTypeDistributor: g.getDistributor().FinishPushRequest(ctx) diff --git a/pkg/mimir/grpc_push_check_test.go b/pkg/mimir/grpc_push_check_test.go index 24915dfcb2b..1fb415e759c 100644 --- a/pkg/mimir/grpc_push_check_test.go +++ b/pkg/mimir/grpc_push_check_test.go @@ -222,6 +222,8 @@ func TestGrpcInflightMethodLimiter(t *testing.T) { } type mockIngesterReceiver struct { + lastRequestSize int64 + startCalls int startBytes int64 finishCalls int @@ -229,15 +231,16 @@ type mockIngesterReceiver struct { returnError error } -func (i *mockIngesterReceiver) StartPushRequest(size int64) error { +func (i *mockIngesterReceiver) StartPushRequest(ctx context.Context, size int64) (context.Context, error) { + i.lastRequestSize = size i.startCalls++ i.startBytes += size - return i.returnError + return ctx, i.returnError } -func (i *mockIngesterReceiver) FinishPushRequest(size int64) { +func (i *mockIngesterReceiver) FinishPushRequest(_ context.Context) { i.finishCalls++ - i.finishBytes += size + i.finishBytes += i.lastRequestSize } type mockDistributorReceiver struct {