From 9c6234dc8013aeba00efd974ed00c4fb07811d37 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Thu, 10 Feb 2022 09:42:38 +0200 Subject: [PATCH] Return 0 instead of -1 when detecting an invalid offset in Kafka When getting the consumer offset of a Kafka topic for which no offset was committed yet, the scaler returns with -1 instead of 0, which causes to scale to the maximum number of replicas. Also fixed some typos and used interim variables for error strings. Fixes #2612 Signed-off-by: Ram Cohen --- pkg/scalers/kafka_scaler.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index c2c66c4211d..2cce3df385d 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -121,7 +121,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta.topic = config.TriggerMetadata["topic"] default: meta.topic = "" - kafkaLog.V(1).Info(fmt.Sprintf("cosumer group %s has no topic specified, "+ + kafkaLog.V(1).Info(fmt.Sprintf("consumer group %s has no topic specified, "+ "will use all topics subscribed by the consumer group for scaling", meta.group)) } @@ -336,17 +336,19 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) { block := offsets.GetBlock(topic, partitionID) if block == nil { - kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID), "") - return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID) + errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID) + kafkaLog.Error(errMsg, "") + return 0, errMsg } consumerOffset := block.Offset if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { - kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID)) - return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID) + errMsg := fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID) + kafkaLog.V(0).Info(errMsg.Error()) + return 0, errMsg } if _, found := topicPartitionOffsets[topic]; !found { - return 0, fmt.Errorf("error finding parition offset for topic %s", topic) + return 0, fmt.Errorf("error finding partition offset for topic %s", topic) } latestOffset := topicPartitionOffsets[topic][partitionID] if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {