Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return 1 or 0 instead of -1 when detecting an invalid offset in Kafka #2621

Merged
merged 9 commits into from
Apr 7, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
- **Datadog Scaler:** Several improvements, including a new optional parameter `metricUnavailableValue` to fill data when no Datadog metric was returned ([#2657](https://github.com/kedacore/keda/issues/2657))
- **Datadog Scaler:** Rely on Datadog API to validate the query ([2761](https://github.com/kedacore/keda/issues/2761))
- **Kafka Scaler:** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608))
- **Kafka Scaler:** New `scaleToZeroOnInvalidOffset` to control behavior when partitions have an invalid offset ([#2033](https://github.com/kedacore/keda/issues/2033)[#2612](https://github.com/kedacore/keda/issues/2612))
- **Metric API Scaler:** Improve error handling on not-ok response ([#2317](https://github.com/kedacore/keda/issues/2317))
- **Prometheus Scaler:** Check and properly inform user that `threshold` is not set ([#2793](https://github.com/kedacore/keda/issues/2793))
- **Prometheus Scaler:** Support for `X-Scope-OrgID` header ([#2667](https://github.com/kedacore/keda/issues/2667))
Expand Down
123 changes: 76 additions & 47 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type kafkaMetadata struct {
allowIdleConsumers bool
version sarama.KafkaVersion

// If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can
// occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612
scaleToZeroOnInvalidOffset bool

// SASL
saslType kafkaSaslType
username string
Expand Down Expand Up @@ -101,6 +105,53 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) {
}, nil
}

func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
meta.saslType = KafkaSASLTypeNone
if val, ok := config.AuthParams["sasl"]; ok {
val = strings.TrimSpace(val)
mode := kafkaSaslType(val)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode
} else {
return fmt.Errorf("err SASL mode %s given", mode)
}
}

meta.enableTLS = false
if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)

if val == "enable" {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
meta.enableTLS = true
} else if val != "disable" {
return fmt.Errorf("err incorrect value for TLS given: %s", val)
}
}

return nil
}

func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta := kafkaMetadata{}
switch {
Expand Down Expand Up @@ -128,7 +179,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta.topic = config.TriggerMetadata["topic"]
default:
meta.topic = ""
kafkaLog.V(1).Info(fmt.Sprintf("cosumer group %s has no topic specified, "+
kafkaLog.V(1).Info(fmt.Sprintf("consumer group %s has no topic specified, "+
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

Expand All @@ -152,47 +203,8 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta.lagThreshold = t
}

meta.saslType = KafkaSASLTypeNone
if val, ok := config.AuthParams["sasl"]; ok {
val = strings.TrimSpace(val)
mode := kafkaSaslType(val)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 {
if config.AuthParams["username"] == "" {
return meta, errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return meta, errors.New("no password given")
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode
} else {
return meta, fmt.Errorf("err SASL mode %s given", mode)
}
}

meta.enableTLS = false
if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)

if val == "enable" {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return meta, errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return meta, errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
meta.enableTLS = true
} else if val != "disable" {
return meta, fmt.Errorf("err incorrect value for TLS given: %s", val)
}
if err := parseKafkaAuthParams(config, &meta); err != nil {
return meta, err
}

meta.allowIdleConsumers = false
Expand All @@ -204,6 +216,15 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta.allowIdleConsumers = t
}

meta.scaleToZeroOnInvalidOffset = false
if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %s", err)
}
meta.scaleToZeroOnInvalidOffset = t
}

meta.version = sarama.V1_0_0_0
if val, ok := config.TriggerMetadata["version"]; ok {
val = strings.TrimSpace(val)
Expand Down Expand Up @@ -343,17 +364,25 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s
func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) {
block := offsets.GetBlock(topic, partitionID)
if block == nil {
kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID), "")
return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
kafkaLog.Error(errMsg, "")
return 0, errMsg
}
consumerOffset := block.Offset
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID)
retVal := int64(1)
if s.metadata.scaleToZeroOnInvalidOffset {
retVal = 0
}
msg := fmt.Sprintf(
"invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d",
topic, s.metadata.group, partitionID, retVal)
kafkaLog.V(0).Info(msg)
return retVal, nil
}

if _, found := topicPartitionOffsets[topic]; !found {
return 0, fmt.Errorf("error finding parition offset for topic %s", topic)
return 0, fmt.Errorf("error finding partition offset for topic %s", topic)
}
latestOffset := topicPartitionOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
Expand Down
11 changes: 11 additions & 0 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ func TestKafkaAuthParams(t *testing.T) {
if meta.enableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS)
}
if meta.enableTLS {
if meta.ca != testData.authParams["ca"] {
t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS)
}
if meta.cert != testData.authParams["cert"] {
t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert)
}
if meta.key != testData.authParams["key"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key)
}
}
}
}

Expand Down
Loading