diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e1cddb5fb2..d530c62c3d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -127,6 +127,7 @@ New deprecation(s): - **General**: Support profiling for KEDA components ([#4789](https://github.com/kedacore/keda/issues/4789)) - **CPU scaler**: Wait for metrics window during CPU scaler tests ([#5294](https://github.com/kedacore/keda/pull/5294)) - **Hashicorp Vault**: Improve test coverage in `pkg/scaling/resolver/hashicorpvault_handler` ([#5195](https://github.com/kedacore/keda/issues/5195)) +- **Kafka Scaler**: Add more test cases for large value of LagThreshold ([#5354](https://github.com/kedacore/keda/issues/5354)) - **Openstack Scaler**: Use Gophercloud SDK ([#3439](https://github.com/kedacore/keda/issues/3439)) ## v2.12.1 diff --git a/pkg/scalers/apache_kafka_scaler_test.go b/pkg/scalers/apache_kafka_scaler_test.go index ed09849496e..a056c11cbe5 100644 --- a/pkg/scalers/apache_kafka_scaler_test.go +++ b/pkg/scalers/apache_kafka_scaler_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "strconv" "testing" "github.com/go-logr/logr" @@ -76,6 +77,8 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{ {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is 0 {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, + // success, LagThreshold is 1000000 + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "1000000", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, activationLagThreshold is 0 {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "10", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success @@ -231,78 +234,65 @@ var parseApacheKafkaAuthParamsTestDataset2 = []parseApacheKafkaAuthParamsTestDat } var apacheKafkaMetricIdentifiers = []apacheKafkaMetricIdentifier{ - {&parseApacheKafkaMetadataTestDataset[10], 0, "s0-kafka-my-topics"}, - {&parseApacheKafkaMetadataTestDataset[10], 1, "s1-kafka-my-topics"}, + {&parseApacheKafkaMetadataTestDataset[11], 0, "s0-kafka-my-topics"}, + {&parseApacheKafkaMetadataTestDataset[11], 1, "s1-kafka-my-topics"}, {&parseApacheKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"}, } func TestApacheKafkaGetBrokers(t *testing.T) { for _, testData := range parseApacheKafkaMetadataTestDataset { meta, err := parseApacheKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithAuthParams}, logr.Discard()) - - if err != nil && !testData.isError { - t.Error("Expected success but got error", err) - } - if testData.isError && err == nil { - t.Error("Expected error but got success") - } - if len(meta.bootstrapServers) != testData.numBrokers { - t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers)) - } - if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) { - t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.bootstrapServers) - } - if meta.group != testData.group { - t.Errorf("Expected group %s but got %s\n", testData.group, meta.group) - } - if !reflect.DeepEqual(testData.topic, meta.topic) { - t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.topic) - } - if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { - t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.partitionLimitation) - } - if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { - t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) - } + getBrokerApacheKafkaTestBase(t, meta, testData, err) meta, err = parseApacheKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithoutAuthParams}, logr.Discard()) - - if err != nil && !testData.isError { - t.Error("Expected success but got error", err) - } - if testData.isError && err == nil { - t.Error("Expected error but got success") - } - if len(meta.bootstrapServers) != testData.numBrokers { - t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers)) - } - if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) { - t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.bootstrapServers) - } - if meta.group != testData.group { - t.Errorf("Expected group %s but got %s\n", testData.group, meta.group) - } - if !reflect.DeepEqual(testData.topic, meta.topic) { - t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.topic) - } - if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { - t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.partitionLimitation) - } - if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { - t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) - } - if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers { - t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers) - } - if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { - t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) - } - if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag { - t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag) - } + getBrokerApacheKafkaTestBase(t, meta, testData, err) } } +func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testData parseApacheKafkaMetadataTestData, err error) { + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if len(meta.bootstrapServers) != testData.numBrokers { + t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers)) + } + if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) { + t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.bootstrapServers) + } + if meta.group != testData.group { + t.Errorf("Expected group %s but got %s\n", testData.group, meta.group) + } + if !reflect.DeepEqual(testData.topic, meta.topic) { + t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.topic) + } + if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { + t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.partitionLimitation) + } + if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { + t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) + } + if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers { + t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers) + } + if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { + t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) + } + if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag { + t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag) + } + + expectedLagThreshold, er := parseExpectedLagThreshold(testData.metadata) + if er != nil { + t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"]) + } + + if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold { + t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold) + } +} func TestApacheKafkaAuthParams(t *testing.T) { // Testing tls and sasl value in TriggerAuthentication for _, testData := range parseApacheKafkaAuthParamsTestDataset { @@ -382,3 +372,11 @@ func TestApacheKafkaGetMetricSpecForScaling(t *testing.T) { } } } + +func parseExpectedLagThreshold(metadata map[string]string) (int64, error) { + val, ok := metadata["lagThreshold"] + if !ok { + return 0, nil + } + return strconv.ParseInt(val, 10, 64) +} diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 311a9805c6b..82e949a9478 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -84,6 +84,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is 0 {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, + // success, lagThreshold is 1000000 + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "1000000", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // failure, activationLagThreshold is not int {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // success, activationLagThreshold is 0 @@ -309,75 +311,62 @@ var parseKafkaOAuthbrearerAuthParamsTestDataset = []parseKafkaAuthParamsTestData } var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ - {&parseKafkaMetadataTestDataset[10], 0, "s0-kafka-my-topic"}, - {&parseKafkaMetadataTestDataset[10], 1, "s1-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[11], 0, "s0-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[11], 1, "s1-kafka-my-topic"}, {&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"}, } func TestGetBrokers(t *testing.T) { for _, testData := range parseKafkaMetadataTestDataset { meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithAuthParams}, logr.Discard()) - - if err != nil && !testData.isError { - t.Error("Expected success but got error", err) - } - if testData.isError && err == nil { - t.Error("Expected error but got success") - } - if len(meta.bootstrapServers) != testData.numBrokers { - t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers)) - } - if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) { - t.Errorf("Expected %v but got %v\n", testData.brokers, meta.bootstrapServers) - } - if meta.group != testData.group { - t.Errorf("Expected group %s but got %s\n", testData.group, meta.group) - } - if meta.topic != testData.topic { - t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic) - } - if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { - t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation) - } - if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { - t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) - } + getBrokerTestBase(t, meta, testData, err) meta, err = parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithoutAuthParams}, logr.Discard()) + getBrokerTestBase(t, meta, testData, err) + } +} - if err != nil && !testData.isError { - t.Error("Expected success but got error", err) - } - if testData.isError && err == nil { - t.Error("Expected error but got success") - } - if len(meta.bootstrapServers) != testData.numBrokers { - t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers)) - } - if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) { - t.Errorf("Expected %v but got %v\n", testData.brokers, meta.bootstrapServers) - } - if meta.group != testData.group { - t.Errorf("Expected group %s but got %s\n", testData.group, meta.group) - } - if meta.topic != testData.topic { - t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic) - } - if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { - t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation) - } - if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { - t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) - } - if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers { - t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers) - } - if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { - t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) - } - if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag { - t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag) - } +func getBrokerTestBase(t *testing.T, meta kafkaMetadata, testData parseKafkaMetadataTestData, err error) { + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if len(meta.bootstrapServers) != testData.numBrokers { + t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers)) + } + if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) { + t.Errorf("Expected %v but got %v\n", testData.brokers, meta.bootstrapServers) + } + if meta.group != testData.group { + t.Errorf("Expected group %s but got %s\n", testData.group, meta.group) + } + if meta.topic != testData.topic { + t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic) + } + if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { + t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation) + } + if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { + t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) + } + if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers { + t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers) + } + if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { + t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) + } + if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag { + t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag) + } + expectedLagThreshold, er := parseExpectedLagThreshold(testData.metadata) + if er != nil { + t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"]) + } + + if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold { + t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold) } }