diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index b8d4ed68e9a7e..7391a8af71916 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -10,13 +10,10 @@ import ( "syscall" "time" - "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" - - "github.com/grafana/loki/v3/pkg/util/httpreq" - "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -27,8 +24,6 @@ import ( tsdb_record "github.com/prometheus/prometheus/tsdb/record" "go.uber.org/atomic" - "github.com/grafana/dskit/tenant" - "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/distributor/writefailures" @@ -40,6 +35,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/loki/v3/pkg/storage/chunk" @@ -48,8 +44,10 @@ import ( "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/deletion" + "github.com/grafana/loki/v3/pkg/util/httpreq" util_log "github.com/grafana/loki/v3/pkg/util/log" mathutil "github.com/grafana/loki/v3/pkg/util/math" + lokiring "github.com/grafana/loki/v3/pkg/util/ring" server_util "github.com/grafana/loki/v3/pkg/util/server" "github.com/grafana/loki/v3/pkg/validation" ) @@ -1177,17 +1175,35 @@ func minTs(stream *logproto.Stream) model.Time { } // For each stream, we check if the stream is owned by the ingester or not and increment/decrement the owned stream count. -func (i *instance) updateOwnedStreams(ownedTokenRange ring.TokenRanges) error { +func (i *instance) updateOwnedStreams(ingesterRing ring.ReadRing, ingesterID string) error { + start := time.Now() + defer func() { + i.metrics.streamsOwnershipCheck.Observe(float64(time.Since(start).Milliseconds())) + }() + var descsBuf = make([]ring.InstanceDesc, ingesterRing.ReplicationFactor()+1) + var hostsBuf = make([]string, ingesterRing.ReplicationFactor()+1) + var zoneBuf = make([]string, ingesterRing.ZonesCount()+1) var err error i.streams.WithLock(func() { i.ownedStreamsSvc.resetStreamCounts() err = i.streams.ForEach(func(s *stream) (bool, error) { - i.ownedStreamsSvc.trackStreamOwnership(s.fp, ownedTokenRange.IncludesKey(uint32(s.fp))) + replicationSet, err := ingesterRing.Get(lokiring.TokenFor(i.instanceID, s.labelsString), ring.WriteNoExtend, descsBuf, hostsBuf, zoneBuf) + if err != nil { + return false, fmt.Errorf("error getting replication set for stream %s: %v", s.labelsString, err) + } + ownedStream := i.isOwnedStream(replicationSet, ingesterID) + i.ownedStreamsSvc.trackStreamOwnership(s.fp, ownedStream) return true, nil }) }) - if err != nil { - return fmt.Errorf("error checking streams ownership: %w", err) + return err +} + +func (i *instance) isOwnedStream(replicationSet ring.ReplicationSet, ingesterID string) bool { + for _, instanceDesc := range replicationSet.Instances { + if instanceDesc.Id == ingesterID { + return true + } } - return nil + return false } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index fd2a3e52bbb97..ad190285ccd08 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -65,9 +65,9 @@ type ingesterMetrics struct { // Shutdown marker for ingester scale down shutdownMarker prometheus.Gauge - flushQueueLength prometheus.Gauge - + flushQueueLength prometheus.Gauge duplicateLogBytesTotal *prometheus.CounterVec + streamsOwnershipCheck prometheus.Histogram } // setRecoveryBytesInUse bounds the bytes reports to >= 0. @@ -296,6 +296,14 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Help: "The total number of series pending in the flush queue.", }), + streamsOwnershipCheck: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "ingester_streams_ownership_check_duration_ms", + Help: "Distribution of streams ownership check durations in milliseconds.", + // 100ms to 5s. + Buckets: []float64{100, 250, 350, 500, 750, 1000, 1500, 2000, 5000}, + }), + duplicateLogBytesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "ingester", diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go index 55f7eb480482b..3bb729815e718 100644 --- a/pkg/ingester/owned_streams.go +++ b/pkg/ingester/owned_streams.go @@ -11,11 +11,11 @@ import ( "github.com/grafana/loki/v3/pkg/util/constants" ) -var notOwnedStreamsMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{ +var notOwnedStreamsMetric = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: constants.Loki, Name: "ingester_not_owned_streams", - Help: "The total number of not owned streams in memory per tenant.", -}, []string{"tenant"}) + Help: "The total number of not owned streams in memory.", +}) type ownedStreamService struct { tenantID string @@ -44,9 +44,10 @@ func (s *ownedStreamService) getOwnedStreamCount() int { return s.ownedStreamCount } -func (s *ownedStreamService) updateFixedLimit() { - limit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID) - s.fixedLimit.Store(int32(limit)) +func (s *ownedStreamService) updateFixedLimit() (old, new int32) { + newLimit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID) + return s.fixedLimit.Swap(int32(newLimit)), int32(newLimit) + } func (s *ownedStreamService) getFixedLimit() int { @@ -60,7 +61,7 @@ func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bo s.ownedStreamCount++ return } - notOwnedStreamsMetric.WithLabelValues(s.tenantID).Inc() + notOwnedStreamsMetric.Inc() s.notOwnedStreams[fp] = nil } @@ -69,7 +70,7 @@ func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) { defer s.lock.Unlock() if _, notOwned := s.notOwnedStreams[fp]; notOwned { - notOwnedStreamsMetric.WithLabelValues(s.tenantID).Dec() + notOwnedStreamsMetric.Dec() delete(s.notOwnedStreams, fp) return } @@ -80,7 +81,7 @@ func (s *ownedStreamService) resetStreamCounts() { s.lock.Lock() defer s.lock.Unlock() s.ownedStreamCount = 0 - notOwnedStreamsMetric.WithLabelValues(s.tenantID).Set(0) + notOwnedStreamsMetric.Sub(float64(len(s.notOwnedStreams))) s.notOwnedStreams = make(map[model.Fingerprint]any) } diff --git a/pkg/ingester/recalculate_owned_streams.go b/pkg/ingester/recalculate_owned_streams.go index 59f8fe6b9269d..d3bf79d29f743 100644 --- a/pkg/ingester/recalculate_owned_streams.go +++ b/pkg/ingester/recalculate_owned_streams.go @@ -2,7 +2,6 @@ package ingester import ( "context" - "errors" "time" "github.com/go-kit/log" @@ -11,8 +10,6 @@ import ( "github.com/grafana/dskit/services" ) -var ownedStreamRingOp = ring.NewOp([]ring.InstanceState{ring.PENDING, ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) - type recalculateOwnedStreams struct { services.Service @@ -42,34 +39,46 @@ func (s *recalculateOwnedStreams) iteration(_ context.Context) error { } func (s *recalculateOwnedStreams) recalculate() { + level.Info(s.logger).Log("msg", "starting recalculate owned streams job") + defer func() { + s.updateFixedLimitForAll() + level.Info(s.logger).Log("msg", "completed recalculate owned streams job") + }() ringChanged, err := s.checkRingForChanges() if err != nil { level.Error(s.logger).Log("msg", "failed to check ring for changes", "err", err) return } if !ringChanged { + level.Debug(s.logger).Log("msg", "ring is not changed, skipping the job") return } - ownedTokenRange, err := s.getTokenRangesForIngester() - if err != nil { - level.Error(s.logger).Log("msg", "failed to get token ranges for ingester", "err", err) - return - } + level.Info(s.logger).Log("msg", "detected ring changes, re-evaluating streams ownership") for _, instance := range s.instancesSupplier() { if !instance.limiter.limits.UseOwnedStreamCount(instance.instanceID) { continue } - err = instance.updateOwnedStreams(ownedTokenRange) + + level.Info(s.logger).Log("msg", "updating streams ownership", "tenant", instance.instanceID) + err := instance.updateOwnedStreams(s.ingestersRing, s.ingesterID) if err != nil { - level.Error(s.logger).Log("msg", "failed to update owned streams", "err", err) + level.Error(s.logger).Log("msg", "failed to re-evaluate streams ownership", "tenant", instance.instanceID, "err", err) + } + } +} + +func (s *recalculateOwnedStreams) updateFixedLimitForAll() { + for _, instance := range s.instancesSupplier() { + oldLimit, newLimit := instance.ownedStreamsSvc.updateFixedLimit() + if oldLimit != newLimit { + level.Info(s.logger).Log("msg", "fixed limit has been updated", "tenant", instance.instanceID, "old", oldLimit, "new", newLimit) } - instance.ownedStreamsSvc.updateFixedLimit() } } func (s *recalculateOwnedStreams) checkRingForChanges() (bool, error) { - rs, err := s.ingestersRing.GetAllHealthy(ownedStreamRingOp) + rs, err := s.ingestersRing.GetAllHealthy(ring.WriteNoExtend) if err != nil { return false, err } @@ -78,15 +87,3 @@ func (s *recalculateOwnedStreams) checkRingForChanges() (bool, error) { s.previousRing = rs return ringChanged, nil } - -func (s *recalculateOwnedStreams) getTokenRangesForIngester() (ring.TokenRanges, error) { - ranges, err := s.ingestersRing.GetTokenRangesForInstance(s.ingesterID) - if err != nil { - if errors.Is(err, ring.ErrInstanceNotFound) { - return nil, nil - } - return nil, err - } - - return ranges, nil -} diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index dda027ca2e443..91b32baef820d 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -2,6 +2,7 @@ package ingester import ( "context" + "fmt" "strconv" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/runtime" + lokiring "github.com/grafana/loki/v3/pkg/util/ring" "github.com/grafana/loki/v3/pkg/validation" ) @@ -49,13 +51,16 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) { } for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - mockRing := &readRingMock{ - replicationSet: ring.ReplicationSet{ - Instances: []ring.InstanceDesc{{Addr: "ingester-0", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{100, 200, 300}}}, - }, - tokenRangesByIngester: map[string]ring.TokenRanges{ - // this ingester owns token ranges [50, 100] and [200, 300] - "ingester-0": {50, 100, 200, 300}, + currentIngesterName := "ingester-0" + tenantName := "tenant-a" + + mockRing := &mockStreamsOwnershipRing{ + currentIngesterName: currentIngesterName, + tenantName: tenantName, + readRingMock: readRingMock{ + replicationSet: ring.ReplicationSet{ + Instances: []ring.InstanceDesc{{Addr: currentIngesterName, Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{100, 200, 300}}}, + }, }, } @@ -69,7 +74,7 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) { tenant, err := newInstance( defaultConfig(), defaultPeriodConfigs, - "tenant-a", + tenantName, limiter, runtime.DefaultTenantConfigs(), noopWAL{}, @@ -85,22 +90,22 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) { require.NoError(t, err) require.Equal(t, 100, tenant.ownedStreamsSvc.getFixedLimit(), "MaxGlobalStreamsPerUser is 100 at this moment") // not owned streams - createStream(t, tenant, 49) - createStream(t, tenant, 101) - createStream(t, tenant, 301) + mockRing.addMapping(createStream(t, tenant, 49), false) + mockRing.addMapping(createStream(t, tenant, 101), false) + mockRing.addMapping(createStream(t, tenant, 301), false) // owned streams - createStream(t, tenant, 50) - createStream(t, tenant, 60) - createStream(t, tenant, 100) - createStream(t, tenant, 250) + mockRing.addMapping(createStream(t, tenant, 50), true) + mockRing.addMapping(createStream(t, tenant, 60), true) + mockRing.addMapping(createStream(t, tenant, 100), true) + mockRing.addMapping(createStream(t, tenant, 250), true) require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount) require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0) mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}} - service := newRecalculateOwnedStreams(mockTenantsSupplier.get, "ingester-0", mockRing, 50*time.Millisecond, log.NewNopLogger()) + service := newRecalculateOwnedStreams(mockTenantsSupplier.get, currentIngesterName, mockRing, 50*time.Millisecond, log.NewNopLogger()) //change the limit to assert that fixed limit is updated after the recalculation limits.DefaultLimits().MaxGlobalStreamsPerUser = 50 @@ -116,6 +121,38 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) { } +type mockStreamsOwnershipRing struct { + readRingMock + currentIngesterName string + tenantName string + streamMapping map[uint32]ring.ReplicationSet +} + +func (r *mockStreamsOwnershipRing) addMapping(stream *stream, owned bool) { + instanceDescs := make([]ring.InstanceDesc, 0, 3) + instanceDescs = append(instanceDescs, ring.InstanceDesc{Id: "ingester-444"}) + instanceDescs = append(instanceDescs, ring.InstanceDesc{Id: "ingester-555"}) + if owned { + instanceDescs = append(instanceDescs, ring.InstanceDesc{Id: r.currentIngesterName}) + } else { + instanceDescs = append(instanceDescs, ring.InstanceDesc{Id: "ingester-333"}) + } + if r.streamMapping == nil { + r.streamMapping = make(map[uint32]ring.ReplicationSet) + } + r.streamMapping[lokiring.TokenFor(r.tenantName, stream.labelsString)] = ring.ReplicationSet{ + Instances: instanceDescs, + } +} + +func (r *mockStreamsOwnershipRing) Get(streamToken uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) { + set, found := r.streamMapping[streamToken] + if !found { + return ring.ReplicationSet{}, fmt.Errorf("replication set mapping is not found for stream hash: %v", streamToken) + } + return set, nil +} + func Test_recalculateOwnedStreams_checkRingForChanges(t *testing.T) { mockRing := &readRingMock{ replicationSet: ring.ReplicationSet{ @@ -141,14 +178,14 @@ func Test_recalculateOwnedStreams_checkRingForChanges(t *testing.T) { require.True(t, ringChanged) } -func createStream(t *testing.T, inst *instance, fingerprint int) { - lbls := labels.Labels{ - labels.Label{Name: "mock", Value: strconv.Itoa(fingerprint)}} +func createStream(t *testing.T, inst *instance, fingerprint int) *stream { + lbls := labels.Labels{labels.Label{Name: "mock", Value: strconv.Itoa(fingerprint)}} - _, _, err := inst.streams.LoadOrStoreNew(lbls.String(), func() (*stream, error) { + stream, _, err := inst.streams.LoadOrStoreNew(lbls.String(), func() (*stream, error) { return inst.createStreamByFP(lbls, model.Fingerprint(fingerprint)) }, nil) require.NoError(t, err) + return stream } type mockTenantsSuplier struct {