From 8f9f33a85b8f2b27f56474fef44aeb2329d4344f Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Fri, 12 Apr 2024 22:18:32 +0100 Subject: [PATCH 1/5] Add logic to scale to zero on invalid offset event with earliest offsetResetPolicy Signed-off-by: dttung2905 --- pkg/scalers/apache_kafka_scaler.go | 3 ++ pkg/scalers/kafka_scaler.go | 3 ++ tests/scalers/kafka/kafka_test.go | 60 +++++++++++++++++++++++++----- 3 files changed, 56 insertions(+), 10 deletions(-) diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index bc13daf3fb9..890cc7d61cf 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -531,6 +531,9 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co } producerOffset := producerOffsets[topic][partitionID] if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { + if s.metadata.scaleToZeroOnInvalidOffset { + return 0, 0, nil + } return producerOffset, producerOffset, nil } diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index ec9c7fc5927..1d05b4a6527 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -805,6 +805,9 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset } latestOffset := topicPartitionOffsets[topic][partitionID] if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { + if s.metadata.scaleToZeroOnInvalidOffset { + return 0, 0, nil + } return latestOffset, latestOffset, nil } diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index 550b3fb8138..8118e81f700 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -12,8 +12,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/client-go/kubernetes" - - . "github.com/kedacore/keda/v2/tests/helper" ) // Load environment variables from .env file @@ -204,7 +202,7 @@ spec: lagThreshold: '1' offsetResetPolicy: 'latest'` - invalidOffsetScaledObjectTemplate = ` + invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -242,6 +240,44 @@ spec: scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' offsetResetPolicy: 'latest'` + invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 5 + cooldownPeriod: 0 + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + scaleDown: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + lagThreshold: '1' + scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' + offsetResetPolicy: 'earliest'` + persistentLagScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject @@ -516,8 +552,10 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa data.ScaleToZeroOnInvalid = StringTrue KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) // Shouldn't scale pods AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) @@ -532,8 +570,10 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat data.ScaleToZeroOnInvalid = StringFalse KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) // Should scale to 1 assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), @@ -568,7 +608,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData t.Log("--- testing persistentLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup) data.Commit = StringTrue data.TopicName = persistentLagTopic @@ -581,7 +621,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData publishMessage(t, persistentLagTopic) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), "replica count should be %d after 2 minute", 1) - // Recreate Deployment to delibrately assign different consumer group to deployment and scaled object + // Recreate Deployment to deliberately assign different consumer group to deployment and scaled object // This is to simulate inability to consume from topic // Scaled Object remains unchanged KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace) @@ -611,7 +651,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da t.Log("--- testing limitToPartitionsWithLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) commitPartition(t, limitToPartitionsWithLagTopic, "latest") data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup) From 54c80594277e5b0c7dbaa75b1d204a6e0eeb2d11 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Sat, 13 Apr 2024 22:10:48 +0100 Subject: [PATCH 2/5] Fix failed CI tests and changes to apache_kafka_test.go as well Signed-off-by: dttung2905 --- .../scalers/apache_kafka/apache_kafka_test.go | 62 ++++++++++++++++--- tests/scalers/kafka/kafka_test.go | 2 + 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/tests/scalers/apache_kafka/apache_kafka_test.go b/tests/scalers/apache_kafka/apache_kafka_test.go index 05de962a9a6..ecabba13ca1 100644 --- a/tests/scalers/apache_kafka/apache_kafka_test.go +++ b/tests/scalers/apache_kafka/apache_kafka_test.go @@ -205,7 +205,7 @@ spec: lagThreshold: '1' offsetResetPolicy: 'latest'` - invalidOffsetScaledObjectTemplate = ` + invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -243,6 +243,44 @@ spec: scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' offsetResetPolicy: 'latest'` + invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 5 + cooldownPeriod: 0 + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + scaleDown: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + lagThreshold: '1' + scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' + offsetResetPolicy: 'earliest'` + persistentLagScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject @@ -518,8 +556,12 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa data.ScaleToZeroOnInvalid = StringTrue KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) // Shouldn't scale pods AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) @@ -534,8 +576,12 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat data.ScaleToZeroOnInvalid = StringFalse KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) // Should scale to 1 assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), @@ -570,7 +616,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData t.Log("--- testing persistentLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup) data.Commit = StringTrue data.TopicName = persistentLagTopic @@ -583,7 +629,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData publishMessage(t, persistentLagTopic) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), "replica count should be %d after 2 minute", 1) - // Recreate Deployment to delibrately assign different consumer group to deployment and scaled object + // Recreate Deployment to deliberately assign different consumer group to deployment and scaled object // This is to simulate inability to consume from topic // Scaled Object remains unchanged KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace) @@ -613,7 +659,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da t.Log("--- testing limitToPartitionsWithLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) commitPartition(t, limitToPartitionsWithLagTopic, "latest") data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup) diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index 8118e81f700..d2d3eac4831 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -12,6 +12,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" ) // Load environment variables from .env file From 6c34d743b085aca8e7789bf1b6255d64f3f9d666 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Wed, 17 Apr 2024 23:11:05 +0100 Subject: [PATCH 3/5] Fix test cases Signed-off-by: dttung2905 --- tests/scalers/kafka/kafka_test.go | 50 +++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index d2d3eac4831..f231a78c4fd 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -444,8 +444,10 @@ func TestScaler(t *testing.T) { testEarliestPolicy(t, kc, data) testLatestPolicy(t, kc, data) testMultiTopic(t, kc, data) - testZeroOnInvalidOffset(t, kc, data) - testOneOnInvalidOffset(t, kc, data) + testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) testPersistentLag(t, kc, data) testScalingOnlyPartitionsWithLag(t, kc, data) } @@ -545,8 +547,8 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) { "replica count should be %d after 2 minute", 2) } -func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing zeroInvalidOffsetTopic: scale out ---") +func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing zeroInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue data.TopicName = zeroInvalidOffsetTopic @@ -556,6 +558,20 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = zeroInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringTrue + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) @@ -563,8 +579,8 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) } -func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing oneInvalidOffsetTopic: scale out ---") +func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue data.TopicName = oneInvalidOffsetTopic @@ -574,6 +590,28 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + + // Should scale to 1 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup) + publishMessage(t, oneInvalidOffsetTopic) + + // Should scale to 0 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10), + "replica count should be %d after 10 minute", 0) +} + +func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = oneInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringFalse + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) From 3b22a35bcb6d92c88f811dd6397b71ee392b4fde Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Mon, 6 May 2024 22:34:03 +0100 Subject: [PATCH 4/5] Fixed failed tests Signed-off-by: dttung2905 --- .../scalers/apache_kafka/apache_kafka_test.go | 53 +++++++++++++++---- tests/scalers/kafka/kafka_test.go | 1 + 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/tests/scalers/apache_kafka/apache_kafka_test.go b/tests/scalers/apache_kafka/apache_kafka_test.go index ecabba13ca1..1d4cf3ae48c 100644 --- a/tests/scalers/apache_kafka/apache_kafka_test.go +++ b/tests/scalers/apache_kafka/apache_kafka_test.go @@ -445,8 +445,10 @@ func TestScaler(t *testing.T) { testEarliestPolicy(t, kc, data) testLatestPolicy(t, kc, data) testMultiTopic(t, kc, data) - testZeroOnInvalidOffset(t, kc, data) - testOneOnInvalidOffset(t, kc, data) + testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data) + testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data) testPersistentLag(t, kc, data) testScalingOnlyPartitionsWithLag(t, kc, data) } @@ -547,7 +549,7 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) { "replica count should be %d after 2 minute", 2) } -func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { +func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing zeroInvalidOffsetTopic: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue @@ -558,8 +560,20 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = zeroInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringTrue + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) @@ -567,8 +581,8 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) } -func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing oneInvalidOffsetTopic: scale out ---") +func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---") data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) data.Commit = StringTrue data.TopicName = oneInvalidOffsetTopic @@ -578,8 +592,29 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + + // Should scale to 1 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup) + publishMessage(t, oneInvalidOffsetTopic) + + // Should scale to 0 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10), + "replica count should be %d after 10 minute", 0) +} + +func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopic: scale out ---") + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = StringTrue + data.TopicName = oneInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = StringFalse + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0 KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index f231a78c4fd..accf7e293f2 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -612,6 +612,7 @@ func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kuber data.ScaleToZeroOnInvalid = StringFalse KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0 KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) From 95351b8a58f998a1b7b65ef3154bf24185647ee9 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Mon, 6 May 2024 23:01:12 +0100 Subject: [PATCH 5/5] Update CHANGELOG Signed-off-by: dttung2905 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cff8e5aa7b..52d7812cf3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ Here is an overview of all new **experimental** features: ### Improvements - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) +- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) ### Fixes