Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic to scale to zero on invalid offset even with earliest offsetResetPolicy #5689

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Here is an overview of all new **experimental** features:
### Improvements

- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))

### Fixes

Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co
}
producerOffset := producerOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
if s.metadata.scaleToZeroOnInvalidOffset {
return 0, 0, nil
}
return producerOffset, producerOffset, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,9 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset
}
latestOffset := topicPartitionOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
if s.metadata.scaleToZeroOnInvalidOffset {
return 0, 0, nil
}
return latestOffset, latestOffset, nil
}

Expand Down
105 changes: 93 additions & 12 deletions tests/scalers/apache_kafka/apache_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ spec:
lagThreshold: '1'
offsetResetPolicy: 'latest'`

invalidOffsetScaledObjectTemplate = `
invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
Expand Down Expand Up @@ -243,6 +243,44 @@ spec:
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'latest'`

invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
pollingInterval: 5
cooldownPeriod: 0
scaleTargetRef:
name: {{.DeploymentName}}
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
triggers:
- type: kafka
metadata:
topic: {{.TopicName}}
bootstrapServers: {{.BootstrapServer}}
consumerGroup: {{.ResetPolicy}}
lagThreshold: '1'
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'earliest'`

persistentLagScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
Expand Down Expand Up @@ -407,8 +445,10 @@ func TestScaler(t *testing.T) {
testEarliestPolicy(t, kc, data)
testLatestPolicy(t, kc, data)
testMultiTopic(t, kc, data)
testZeroOnInvalidOffset(t, kc, data)
testOneOnInvalidOffset(t, kc, data)
testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testPersistentLag(t, kc, data)
testScalingOnlyPartitionsWithLag(t, kc, data)
}
Expand Down Expand Up @@ -509,7 +549,7 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) {
"replica count should be %d after 2 minute", 2)
}

func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopic: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
Expand All @@ -518,14 +558,54 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = zeroInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = oneInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)

commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup)
publishMessage(t, oneInvalidOffsetTopic)

// Should scale to 0
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10),
"replica count should be %d after 10 minute", 0)
}

func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopic: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
Expand All @@ -534,8 +614,9 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
Expand Down Expand Up @@ -570,7 +651,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
t.Log("--- testing persistentLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup)
data.Commit = StringTrue
data.TopicName = persistentLagTopic
Expand All @@ -583,7 +664,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
publishMessage(t, persistentLagTopic)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)
// Recreate Deployment to delibrately assign different consumer group to deployment and scaled object
// Recreate Deployment to deliberately assign different consumer group to deployment and scaled object
// This is to simulate inability to consume from topic
// Scaled Object remains unchanged
KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace)
Expand Down Expand Up @@ -613,7 +694,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da
t.Log("--- testing limitToPartitionsWithLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
commitPartition(t, limitToPartitionsWithLagTopic, "latest")

data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup)
Expand Down
109 changes: 95 additions & 14 deletions tests/scalers/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ spec:
lagThreshold: '1'
offsetResetPolicy: 'latest'`

invalidOffsetScaledObjectTemplate = `
invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
Expand Down Expand Up @@ -242,6 +242,44 @@ spec:
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'latest'`

invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
pollingInterval: 5
cooldownPeriod: 0
scaleTargetRef:
name: {{.DeploymentName}}
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
triggers:
- type: kafka
metadata:
topic: {{.TopicName}}
bootstrapServers: {{.BootstrapServer}}
consumerGroup: {{.ResetPolicy}}
lagThreshold: '1'
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'earliest'`

persistentLagScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
Expand Down Expand Up @@ -406,8 +444,10 @@ func TestScaler(t *testing.T) {
testEarliestPolicy(t, kc, data)
testLatestPolicy(t, kc, data)
testMultiTopic(t, kc, data)
testZeroOnInvalidOffset(t, kc, data)
testOneOnInvalidOffset(t, kc, data)
testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testPersistentLag(t, kc, data)
testScalingOnlyPartitionsWithLag(t, kc, data)
}
Expand Down Expand Up @@ -507,33 +547,74 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) {
"replica count should be %d after 2 minute", 2)
}

func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopic: scale out ---")
func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = zeroInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopic: scale out ---")
func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = zeroInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = oneInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)

commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup)
publishMessage(t, oneInvalidOffsetTopic)

// Should scale to 0
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10),
"replica count should be %d after 10 minute", 0)
}

func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = oneInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
Expand Down Expand Up @@ -568,7 +649,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
t.Log("--- testing persistentLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup)
data.Commit = StringTrue
data.TopicName = persistentLagTopic
Expand All @@ -581,7 +662,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
publishMessage(t, persistentLagTopic)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)
// Recreate Deployment to delibrately assign different consumer group to deployment and scaled object
// Recreate Deployment to deliberately assign different consumer group to deployment and scaled object
// This is to simulate inability to consume from topic
// Scaled Object remains unchanged
KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace)
Expand Down Expand Up @@ -611,7 +692,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da
t.Log("--- testing limitToPartitionsWithLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
commitPartition(t, limitToPartitionsWithLagTopic, "latest")

data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup)
Expand Down
Loading