diff --git a/tests/scalers/apache_kafka/apache_kafka_test.go b/tests/scalers/apache_kafka/apache_kafka_test.go index 05de962a9a6..ecabba13ca1 100644 --- a/tests/scalers/apache_kafka/apache_kafka_test.go +++ b/tests/scalers/apache_kafka/apache_kafka_test.go @@ -205,7 +205,7 @@ spec: lagThreshold: '1' offsetResetPolicy: 'latest'` - invalidOffsetScaledObjectTemplate = ` + invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -243,6 +243,44 @@ spec: scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' offsetResetPolicy: 'latest'` + invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 5 + cooldownPeriod: 0 + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + scaleDown: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + lagThreshold: '1' + scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' + offsetResetPolicy: 'earliest'` + persistentLagScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject @@ -518,8 +556,12 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa data.ScaleToZeroOnInvalid = StringTrue KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) // Shouldn't scale pods AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) @@ -534,8 +576,12 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat data.ScaleToZeroOnInvalid = StringFalse KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) - defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate) // Should scale to 1 assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), @@ -570,7 +616,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData t.Log("--- testing persistentLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup) data.Commit = StringTrue data.TopicName = persistentLagTopic @@ -583,7 +629,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData publishMessage(t, persistentLagTopic) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), "replica count should be %d after 2 minute", 1) - // Recreate Deployment to delibrately assign different consumer group to deployment and scaled object + // Recreate Deployment to deliberately assign different consumer group to deployment and scaled object // This is to simulate inability to consume from topic // Scaled Object remains unchanged KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace) @@ -613,7 +659,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da t.Log("--- testing limitToPartitionsWithLag: no scale out ---") // Simulate Consumption from topic by consumer group - // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + // To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit) commitPartition(t, limitToPartitionsWithLagTopic, "latest") data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup) diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index 8118e81f700..d2d3eac4831 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -12,6 +12,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" ) // Load environment variables from .env file