Skip to content

Commit

Permalink
Return 0 instead of -1 when detecting an invalid offset in Kafka
Browse files Browse the repository at this point in the history
When getting the consumer offset of a Kafka topic for which no offset
was committed yet, the scaler returns with -1 instead of 0, which
causes to scale to the maximum number of replicas.

Also fixed some typos and used interim variables for error strings.

Fixes kedacore#2612

Signed-off-by: Ram Cohen <[email protected]>
  • Loading branch information
RamCohen committed Feb 10, 2022
1 parent 2101434 commit 8d67bc2
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta.topic = config.TriggerMetadata["topic"]
default:
meta.topic = ""
kafkaLog.V(1).Info(fmt.Sprintf("cosumer group %s has no topic specified, "+
kafkaLog.V(1).Info(fmt.Sprintf("consumer group %s has no topic specified, "+
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

Expand Down Expand Up @@ -336,17 +336,19 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s
func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) {
block := offsets.GetBlock(topic, partitionID)
if block == nil {
kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID), "")
return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
kafkaLog.Error(errMsg, "")
return 0, errMsg
}
consumerOffset := block.Offset
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID)
errMsg := fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID)
kafkaLog.V(0).Info(errMsg.Error())
return 0, errMsg
}

if _, found := topicPartitionOffsets[topic]; !found {
return 0, fmt.Errorf("error finding parition offset for topic %s", topic)
return 0, fmt.Errorf("error finding partition offset for topic %s", topic)
}
latestOffset := topicPartitionOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
Expand Down

0 comments on commit 8d67bc2

Please sign in to comment.