Skip to content

Commit

Permalink
Fix failed CI tests and changes to apache_kafka_test.go as well
Browse files Browse the repository at this point in the history
Signed-off-by: dttung2905 <[email protected]>
  • Loading branch information
dttung2905 committed Apr 13, 2024
1 parent 91a52b0 commit 5627770
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 8 deletions.
62 changes: 54 additions & 8 deletions tests/scalers/apache_kafka/apache_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ spec:
lagThreshold: '1'
offsetResetPolicy: 'latest'`

invalidOffsetScaledObjectTemplate = `
invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
Expand Down Expand Up @@ -243,6 +243,44 @@ spec:
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'latest'`

invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
pollingInterval: 5
cooldownPeriod: 0
scaleTargetRef:
name: {{.DeploymentName}}
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
triggers:
- type: kafka
metadata:
topic: {{.TopicName}}
bootstrapServers: {{.BootstrapServer}}
consumerGroup: {{.ResetPolicy}}
lagThreshold: '1'
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'earliest'`

persistentLagScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
Expand Down Expand Up @@ -518,8 +556,12 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
Expand All @@ -534,8 +576,12 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
Expand Down Expand Up @@ -570,7 +616,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
t.Log("--- testing persistentLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup)
data.Commit = StringTrue
data.TopicName = persistentLagTopic
Expand All @@ -583,7 +629,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
publishMessage(t, persistentLagTopic)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)
// Recreate Deployment to delibrately assign different consumer group to deployment and scaled object
// Recreate Deployment to deliberately assign different consumer group to deployment and scaled object
// This is to simulate inability to consume from topic
// Scaled Object remains unchanged
KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace)
Expand Down Expand Up @@ -613,7 +659,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da
t.Log("--- testing limitToPartitionsWithLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
commitPartition(t, limitToPartitionsWithLagTopic, "latest")

data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup)
Expand Down
2 changes: 2 additions & 0 deletions tests/scalers/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper"
)

// Load environment variables from .env file
Expand Down

0 comments on commit 5627770

Please sign in to comment.