Skip to content

Commit

Permalink
fix(kafka): Handle Sarama errors properly (#3452)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorge Turrado Ferrero authored Aug 2, 2022
1 parent 81ae8e9 commit 8bc4c4a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133))
- **GCP Stackdriver Scaler:** Added aggregation parameters ([#3008](https://github.com/kedacore/keda/issues/3008))
- **Kafka Scaler:** Handle Sarama errors properly ([#3056](https://github.com/kedacore/keda/issues/3056))
- **Kafka Scaler:** Support of passphrase encrypted PKCS #\8 private key ([3449](https://github.com/kedacore/keda/issues/3449))
- **Prometheus Scaler:** Add ignoreNullValues to return error when prometheus return null in values ([#3065](https://github.com/kedacore/keda/issues/3065))
- **Selenium Grid Scaler:** Edge active sessions not being properly counted ([#2709](https://github.com/kedacore/keda/issues/2709))
Expand Down
19 changes: 19 additions & 0 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) {
if err != nil {
return nil, fmt.Errorf("error listing cg offset: %s", err)
}

if listCGOffsetResponse.Err > 0 {
errMsg := fmt.Errorf("error listing cg offset: %s", listCGOffsetResponse.Err.Error())
kafkaLog.Error(errMsg, "")
}

for topicName := range listCGOffsetResponse.Blocks {
topicsToDescribe = append(topicsToDescribe, topicName)
}
Expand All @@ -350,6 +356,10 @@ func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) {

topicPartitions := make(map[string][]int32, len(topicsMetadata))
for _, topicMetadata := range topicsMetadata {
if topicMetadata.Err > 0 {
errMsg := fmt.Errorf("error describing topics: %s", topicMetadata.Err.Error())
kafkaLog.Error(errMsg, "")
}
partitionMetadata := topicMetadata.Partitions
partitions := make([]int32, len(partitionMetadata))
for i, p := range partitionMetadata {
Expand All @@ -365,6 +375,10 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s
if err != nil {
return nil, fmt.Errorf("error listing consumer group offsets: %s", err)
}
if offsets.Err > 0 {
errMsg := fmt.Errorf("error listing consumer group offsets: %s", offsets.Err.Error())
kafkaLog.Error(errMsg, "")
}
return offsets, nil
}

Expand All @@ -375,6 +389,11 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset
kafkaLog.Error(errMsg, "")
return 0, errMsg
}
if block.Err > 0 {
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d: %s", topic, partitionID, offsets.Err.Error())
kafkaLog.Error(errMsg, "")
}

consumerOffset := block.Offset
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
retVal := int64(1)
Expand Down

0 comments on commit 8bc4c4a

Please sign in to comment.