Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka): Handle Sarama errors properly #3452

Merged
merged 2 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133))
- **GCP Stackdriver Scaler:** Added aggregation parameters ([#3008](https://github.com/kedacore/keda/issues/3008))
- **Kafka Scaler:** Handle Sarama errors properly ([#3056](https://github.com/kedacore/keda/issues/3056))
- **Prometheus Scaler:** Add ignoreNullValues to return error when prometheus return null in values ([#3065](https://github.com/kedacore/keda/issues/3065))
- **Selenium Grid Scaler:** Edge active sessions not being properly counted ([#2709](https://github.com/kedacore/keda/issues/2709))
- **Selenium Grid Scaler:** Max Sessions implementation issue ([#3061](https://github.com/kedacore/keda/issues/3061))
Expand Down
19 changes: 19 additions & 0 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,12 @@ func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) {
if err != nil {
return nil, fmt.Errorf("error listing cg offset: %s", err)
}

if listCGOffsetResponse.Err > 0 {
errMsg := fmt.Errorf("error listing cg offset: %s", listCGOffsetResponse.Err.Error())
kafkaLog.Error(errMsg, "")
}

for topicName := range listCGOffsetResponse.Blocks {
topicsToDescribe = append(topicsToDescribe, topicName)
}
Expand All @@ -344,6 +350,10 @@ func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) {

topicPartitions := make(map[string][]int32, len(topicsMetadata))
for _, topicMetadata := range topicsMetadata {
if topicMetadata.Err > 0 {
errMsg := fmt.Errorf("error describing topics: %s", topicMetadata.Err.Error())
kafkaLog.Error(errMsg, "")
}
partitionMetadata := topicMetadata.Partitions
partitions := make([]int32, len(partitionMetadata))
for i, p := range partitionMetadata {
Expand All @@ -359,6 +369,10 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s
if err != nil {
return nil, fmt.Errorf("error listing consumer group offsets: %s", err)
}
if offsets.Err > 0 {
errMsg := fmt.Errorf("error listing consumer group offsets: %s", offsets.Err.Error())
kafkaLog.Error(errMsg, "")
}
return offsets, nil
}

Expand All @@ -369,6 +383,11 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset
kafkaLog.Error(errMsg, "")
return 0, errMsg
}
if block.Err > 0 {
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d: %s", topic, partitionID, offsets.Err.Error())
kafkaLog.Error(errMsg, "")
}

consumerOffset := block.Offset
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
retVal := int64(1)
Expand Down