Skip to content

Commit

Permalink
Kafka scaler: batch GetOffset requests (#1956)
Browse files Browse the repository at this point in the history
Signed-off-by: Lionel Villard <[email protected]>
  • Loading branch information
lionelvillard authored Jul 13, 2021
1 parent 3eb5744 commit 959c870
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- Allow influxdb `authToken`, `serverURL`, and `organizationName` to be sourced from `(Cluster)TriggerAuthentication` ([#1904](https://github.com/kedacore/keda/pull/1904))
- IBM MQ scaler password handling fix ([#1939](https://github.com/kedacore/keda/pull/1939))
- Metrics APIServer: Add ratelimiting parameters to override client ([#1944](https://github.com/kedacore/keda/pull/1944))
- Optimize KafkaScaler by fetching all topic offsets using a single HTTP request ([#1956](https://github.com/kedacore/keda/pull/1956))

### Breaking Changes

Expand Down
71 changes: 63 additions & 8 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,13 @@ func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {
return false, err
}

topicOffsets, err := s.getTopicOffsets(partitions)
if err != nil {
return false, err
}

for _, partition := range partitions {
lag, err := s.getLagForPartition(partition, offsets)
lag, err := s.getLagForPartition(partition, offsets, topicOffsets)
if err != nil && lag == invalidOffset {
return true, nil
}
Expand Down Expand Up @@ -312,7 +317,7 @@ func (s *kafkaScaler) getOffsets(partitions []int32) (*sarama.OffsetFetchRespons
return offsets, nil
}

func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.OffsetFetchResponse) (int64, error) {
func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.OffsetFetchResponse, topicOffsets map[int32]int64) (int64, error) {
block := offsets.GetBlock(s.metadata.topic, partition)
if block == nil {
kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition), "")
Expand All @@ -323,11 +328,8 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)
}
latestOffset, err := s.client.GetOffset(s.metadata.topic, partition, sarama.OffsetNewest)
if err != nil {
kafkaLog.Error(err, fmt.Sprintf("error finding latest offset for topic %s and partition %d\n", s.metadata.topic, partition))
return 0, fmt.Errorf("error finding latest offset for topic %s and partition %d", s.metadata.topic, partition)
}

latestOffset := topicOffsets[partition]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
return latestOffset, nil
}
Expand Down Expand Up @@ -372,9 +374,14 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS
return []external_metrics.ExternalMetricValue{}, err
}

topicOffsets, err := s.getTopicOffsets(partitions)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

totalLag := int64(0)
for _, partition := range partitions {
lag, _ := s.getLagForPartition(partition, offsets)
lag, _ := s.getLagForPartition(partition, offsets, topicOffsets)

totalLag += lag
}
Expand All @@ -396,3 +403,51 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, error) {
version := int16(0)
if s.client.Config().Version.IsAtLeast(sarama.V0_10_1_0) {
version = 1
}

// Step 1: build one OffsetRequest instance per broker.
requests := make(map[*sarama.Broker]*sarama.OffsetRequest)

for _, partitionID := range partitions {
broker, err := s.client.Leader(s.metadata.topic, partitionID)
if err != nil {
return nil, err
}

request, ok := requests[broker]
if !ok {
request = &sarama.OffsetRequest{Version: version}
requests[broker] = request
}

request.AddBlock(s.metadata.topic, partitionID, sarama.OffsetNewest, 1)
}

offsets := make(map[int32]int64)

// Step 2: send requests, one per broker, and collect offsets
for broker, request := range requests {
response, err := broker.GetAvailableOffsets(request)

if err != nil {
return nil, err
}

for _, blocks := range response.Blocks {
for partitionID, block := range blocks {
if block.Err != sarama.ErrNoError {
return nil, block.Err
}

offsets[partitionID] = block.Offset
}
}
}

return offsets, nil
}

0 comments on commit 959c870

Please sign in to comment.