From 3a99d0229239b6551aceb2cede371d0f7b70e6b7 Mon Sep 17 00:00:00 2001 From: Patryk Prus Date: Wed, 21 Feb 2024 17:49:37 -0500 Subject: [PATCH 1/4] Switch ingester limiter to use ingesterRing rather than lifecycler --- pkg/ingester/ingester.go | 2 +- pkg/ingester/limiter.go | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4a9ee20ff94..b324ece141c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -408,7 +408,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, // Init the limter and instantiate the user states which depend on it i.limiter = NewLimiter( limits, - i.lifecycler, + NewLimiterRing(ingestersRing, cfg.IngesterRing.InstanceZone), cfg.IngesterRing.ReplicationFactor, cfg.IngesterRing.ZoneAwarenessEnabled, ) diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index c59ca795b5d..953e35263ae 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -8,6 +8,8 @@ package ingester import ( "math" + "github.com/grafana/dskit/ring" + "github.com/grafana/mimir/pkg/util" util_math "github.com/grafana/mimir/pkg/util/math" "github.com/grafana/mimir/pkg/util/validation" @@ -21,6 +23,23 @@ type RingCount interface { ZonesCount() int } +// Wraps ring.ReadRing to implement RingCount +type LimiterRing struct { + ring.ReadRing + zone string +} + +func NewLimiterRing(ring ring.ReadRing, zone string) *LimiterRing { + return &LimiterRing{ + ReadRing: ring, + zone: zone, + } +} + +func (r *LimiterRing) InstancesInZoneCount() int { + return r.ReadRing.InstancesInZoneCount(r.zone) +} + // Limiter implements primitives to get the maximum number of series // an ingester can handle for a specific tenant type Limiter struct { From c5bce98ad60bea03a4bbdf9286504b782233dfd5 Mon Sep 17 00:00:00 2001 From: Patryk Prus Date: Wed, 21 Feb 2024 18:25:11 -0500 Subject: [PATCH 2/4] Start ingester ring in tests --- pkg/ingester/ingester_ingest_storage_test.go | 10 +++++++++- pkg/ingester/ingester_test.go | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_ingest_storage_test.go b/pkg/ingester/ingester_ingest_storage_test.go index 899dffb81a6..7296d22a4ca 100644 --- a/pkg/ingester/ingester_ingest_storage_test.go +++ b/pkg/ingester/ingester_ingest_storage_test.go @@ -408,7 +408,15 @@ func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, over // Disable TSDB head compaction jitter to have predictable tests. ingesterCfg.BlocksStorageConfig.TSDB.HeadCompactionIntervalJitterEnabled = false - ingester, err := New(*ingesterCfg, overrides, nil, nil, reg, util_test.NewTestingLogger(t)) + // Start the ingester ring + rng, err := ring.New(ingesterCfg.IngesterRing.ToRingConfig(), "ingester", IngesterRingKey, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), rng)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), rng)) + }) + + ingester, err := New(*ingesterCfg, overrides, rng, nil, reg, util_test.NewTestingLogger(t)) require.NoError(t, err) return ingester, kafkaCluster diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 87b8e3e245d..a28de3ca719 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -4596,7 +4596,15 @@ func prepareIngesterWithBlockStorageAndOverrides(t testing.TB, ingesterCfg Confi // Disable TSDB head compaction jitter to have predictable tests. ingesterCfg.BlocksStorageConfig.TSDB.HeadCompactionIntervalJitterEnabled = false - ingester, err := New(ingesterCfg, overrides, nil, nil, registerer, noDebugNoopLogger{}) + // Start the ingester ring + rng, err := ring.New(ingesterCfg.IngesterRing.ToRingConfig(), "ingester", IngesterRingKey, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), rng)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), rng)) + }) + + ingester, err := New(ingesterCfg, overrides, rng, nil, registerer, noDebugNoopLogger{}) if err != nil { return nil, err } From 52ea1722dc74cd79c4a575c92a66ed44faca487f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 22 Feb 2024 10:24:13 +0100 Subject: [PATCH 3/4] Removed LimiterRing wrapper, pass zone to Limiter. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester.go | 3 +- pkg/ingester/limiter.go | 26 +++--------- pkg/ingester/limiter_test.go | 47 ++++++++-------------- pkg/ingester/user_metrics_metadata_test.go | 12 ++---- pkg/ingester/user_tsdb_test.go | 2 +- 5 files changed, 29 insertions(+), 61 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index b324ece141c..9d79538e3a4 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -408,9 +408,10 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, // Init the limter and instantiate the user states which depend on it i.limiter = NewLimiter( limits, - NewLimiterRing(ingestersRing, cfg.IngesterRing.InstanceZone), + ingestersRing, cfg.IngesterRing.ReplicationFactor, cfg.IngesterRing.ZoneAwarenessEnabled, + cfg.IngesterRing.InstanceZone, ) if cfg.ReadPathCPUUtilizationLimit > 0 || cfg.ReadPathMemoryUtilizationLimit > 0 { diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 953e35263ae..8d33e9b9db9 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -8,8 +8,6 @@ package ingester import ( "math" - "github.com/grafana/dskit/ring" - "github.com/grafana/mimir/pkg/util" util_math "github.com/grafana/mimir/pkg/util/math" "github.com/grafana/mimir/pkg/util/validation" @@ -19,27 +17,10 @@ import ( // to count members type RingCount interface { InstancesCount() int - InstancesInZoneCount() int + InstancesInZoneCount(zone string) int ZonesCount() int } -// Wraps ring.ReadRing to implement RingCount -type LimiterRing struct { - ring.ReadRing - zone string -} - -func NewLimiterRing(ring ring.ReadRing, zone string) *LimiterRing { - return &LimiterRing{ - ReadRing: ring, - zone: zone, - } -} - -func (r *LimiterRing) InstancesInZoneCount() int { - return r.ReadRing.InstancesInZoneCount(r.zone) -} - // Limiter implements primitives to get the maximum number of series // an ingester can handle for a specific tenant type Limiter struct { @@ -47,6 +28,7 @@ type Limiter struct { ring RingCount replicationFactor int zoneAwarenessEnabled bool + ingesterZone string } // NewLimiter makes a new in-memory series limiter @@ -55,12 +37,14 @@ func NewLimiter( ring RingCount, replicationFactor int, zoneAwarenessEnabled bool, + ingesterZone string, ) *Limiter { return &Limiter{ limits: limits, ring: ring, replicationFactor: replicationFactor, zoneAwarenessEnabled: zoneAwarenessEnabled, + ingesterZone: ingesterZone, } } @@ -131,7 +115,7 @@ func (l *Limiter) convertGlobalToLocalLimit(userShardSize int, globalLimit int) if zonesCount > 1 { // In this case zone-aware replication is enabled, and ingestersInZoneCount is initially set to // the total number of ingesters in the corresponding zone - ingestersInZoneCount = l.ring.InstancesInZoneCount() + ingestersInZoneCount = l.ring.InstancesInZoneCount(l.ingesterZone) } else { // In this case zone-aware replication is disabled, and ingestersInZoneCount is initially set to // the total number of ingesters diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index d9ecbeb7321..895a68e6b5c 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -10,7 +10,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/grafana/mimir/pkg/util/validation" @@ -287,10 +286,7 @@ func runLimiterMaxFunctionTest( t.Run(testName, func(t *testing.T) { // Mock the ring - ring := &ringCountMock{} - ring.On("InstancesCount").Return(testData.ringIngesterCount) - ring.On("InstancesInZoneCount").Return(testData.ingestersInZoneCount) - ring.On("ZonesCount").Return(testData.ringZonesCount) + ring := &ringCountMock{instancesCount: testData.ringIngesterCount, zonesCount: testData.ringZonesCount, instancesInZoneCount: testData.ingestersInZoneCount} // Mock limits limits := validation.Limits{IngestionTenantShardSize: testData.shardSize} @@ -299,7 +295,7 @@ func runLimiterMaxFunctionTest( overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) - limiter := NewLimiter(overrides, ring, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled) + limiter := NewLimiter(overrides, ring, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, "zone") actual := runMaxFn(limiter) assert.Equal(t, testData.expectedValue, actual) }) @@ -342,9 +338,7 @@ func TestLimiter_AssertMaxSeriesPerMetric(t *testing.T) { t.Run(testName, func(t *testing.T) { // Mock the ring - ring := &ringCountMock{} - ring.On("InstancesCount").Return(testData.ringIngesterCount) - ring.On("ZonesCount").Return(1) + ring := &ringCountMock{instancesCount: testData.ringIngesterCount, zonesCount: 1} // Mock limits limits, err := validation.NewOverrides(validation.Limits{ @@ -352,7 +346,7 @@ func TestLimiter_AssertMaxSeriesPerMetric(t *testing.T) { }, nil) require.NoError(t, err) - limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false) + limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, "") actual := limiter.IsWithinMaxSeriesPerMetric("test", testData.series) assert.Equal(t, testData.expected, actual) @@ -395,9 +389,7 @@ func TestLimiter_AssertMaxMetadataPerMetric(t *testing.T) { t.Run(testName, func(t *testing.T) { // Mock the ring - ring := &ringCountMock{} - ring.On("InstancesCount").Return(testData.ringIngesterCount) - ring.On("ZonesCount").Return(1) + ring := &ringCountMock{instancesCount: testData.ringIngesterCount, zonesCount: 1} // Mock limits limits, err := validation.NewOverrides(validation.Limits{ @@ -405,7 +397,7 @@ func TestLimiter_AssertMaxMetadataPerMetric(t *testing.T) { }, nil) require.NoError(t, err) - limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false) + limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, "") actual := limiter.IsWithinMaxMetadataPerMetric("test", testData.metadata) assert.Equal(t, testData.expected, actual) @@ -449,9 +441,7 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) { t.Run(testName, func(t *testing.T) { // Mock the ring - ring := &ringCountMock{} - ring.On("InstancesCount").Return(testData.ringIngesterCount) - ring.On("ZonesCount").Return(1) + ring := &ringCountMock{instancesCount: testData.ringIngesterCount, zonesCount: 1} // Mock limits limits, err := validation.NewOverrides(validation.Limits{ @@ -459,7 +449,7 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) { }, nil) require.NoError(t, err) - limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false) + limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, "") actual := limiter.IsWithinMaxSeriesPerUser("test", testData.series, limiter.getShardSize("test")) assert.Equal(t, testData.expected, actual) @@ -503,9 +493,7 @@ func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) { t.Run(testName, func(t *testing.T) { // Mock the ring - ring := &ringCountMock{} - ring.On("InstancesCount").Return(testData.ringIngesterCount) - ring.On("ZonesCount").Return(1) + ring := &ringCountMock{instancesCount: testData.ringIngesterCount, zonesCount: 1} // Mock limits limits, err := validation.NewOverrides(validation.Limits{ @@ -513,7 +501,7 @@ func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) { }, nil) require.NoError(t, err) - limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false) + limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, "") actual := limiter.IsWithinMaxMetricsWithMetadataPerUser("test", testData.metadata) assert.Equal(t, testData.expected, actual) @@ -522,20 +510,19 @@ func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) { } type ringCountMock struct { - mock.Mock + instancesCount int + instancesInZoneCount int + zonesCount int } func (m *ringCountMock) InstancesCount() int { - args := m.Called() - return args.Int(0) + return m.instancesCount } -func (m *ringCountMock) InstancesInZoneCount() int { - args := m.Called() - return args.Int(0) +func (m *ringCountMock) InstancesInZoneCount(zone string) int { + return m.instancesInZoneCount } func (m *ringCountMock) ZonesCount() int { - args := m.Called() - return args.Int(0) + return m.zonesCount } diff --git a/pkg/ingester/user_metrics_metadata_test.go b/pkg/ingester/user_metrics_metadata_test.go index 3702bbd1a5c..1402d63878e 100644 --- a/pkg/ingester/user_metrics_metadata_test.go +++ b/pkg/ingester/user_metrics_metadata_test.go @@ -74,16 +74,14 @@ func TestUserMetricsMetadata(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { // Mock the ring - ring := &ringCountMock{} - ring.On("InstancesCount").Return(1) - ring.On("ZonesCount").Return(1) + ring := &ringCountMock{instancesCount: 1, zonesCount: 1} limits, err := validation.NewOverrides(validation.Limits{ MaxGlobalMetricsWithMetadataPerUser: testData.maxMetadataPerUser, MaxGlobalMetadataPerMetric: testData.maxMetadataPerMetric, }, nil) require.NoError(t, err) - limiter := NewLimiter(limits, ring, 1, false) + limiter := NewLimiter(limits, ring, 1, false, "") metrics := newIngesterMetrics( prometheus.NewPedanticRegistry(), @@ -132,13 +130,11 @@ func (t noopTestingT) Errorf(string, ...interface{}) {} func TestUserMetricsMetadataRequest(t *testing.T) { // Mock the ring - ring := &ringCountMock{} - ring.On("InstancesCount").Return(1) - ring.On("ZonesCount").Return(1) + ring := &ringCountMock{instancesCount: 1, zonesCount: 1} limits, err := validation.NewOverrides(validation.Limits{}, nil) require.NoError(t, err) - limiter := NewLimiter(limits, ring, 1, false) + limiter := NewLimiter(limits, ring, 1, false, "") metrics := newIngesterMetrics( prometheus.NewPedanticRegistry(), diff --git a/pkg/ingester/user_tsdb_test.go b/pkg/ingester/user_tsdb_test.go index c29cfde94e2..85697f1b254 100644 --- a/pkg/ingester/user_tsdb_test.go +++ b/pkg/ingester/user_tsdb_test.go @@ -217,7 +217,7 @@ func TestGetSeriesAndShardsForSeriesLimit(t *testing.T) { db := userTSDB{ db: tsdbDB, - limiter: NewLimiter(overrides, nil, 3, true), + limiter: NewLimiter(overrides, nil, 3, true, "zone"), ownedSeriesCount: 555, ownedSeriesShardSize: 333, } From ea30058e331601d6967ff03e276988026d149aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 22 Feb 2024 10:27:26 +0100 Subject: [PATCH 4/4] Make linter happy. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/limiter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 895a68e6b5c..4e416de0331 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -519,7 +519,7 @@ func (m *ringCountMock) InstancesCount() int { return m.instancesCount } -func (m *ringCountMock) InstancesInZoneCount(zone string) int { +func (m *ringCountMock) InstancesInZoneCount(_ string) int { return m.instancesInZoneCount }